You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ju...@apache.org on 2015/03/03 10:58:58 UTC
hadoop git commit: MAPREDUCE-5583. Ability to limit running map and
reduce tasks. Contributed by Jason Lowe.
Repository: hadoop
Updated Branches:
refs/heads/trunk 9ae7f9eb7 -> 4228de940
MAPREDUCE-5583. Ability to limit running map and reduce tasks. Contributed by Jason Lowe.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4228de94
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4228de94
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4228de94
Branch: refs/heads/trunk
Commit: 4228de94028f1e10ca59ce23e963e488fe566909
Parents: 9ae7f9e
Author: Junping Du <ju...@apache.org>
Authored: Tue Mar 3 02:01:04 2015 -0800
Committer: Junping Du <ju...@apache.org>
Committed: Tue Mar 3 02:02:28 2015 -0800
----------------------------------------------------------------------
hadoop-mapreduce-project/CHANGES.txt | 3 +
.../v2/app/rm/RMContainerAllocator.java | 65 +++++-
.../v2/app/rm/RMContainerRequestor.java | 74 ++++++-
.../v2/app/rm/TestRMContainerAllocator.java | 214 +++++++++++++++++++
.../apache/hadoop/mapreduce/MRJobConfig.java | 8 +
.../src/main/resources/mapred-default.xml | 16 ++
6 files changed, 363 insertions(+), 17 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt
index 5524b14..7a2eff3 100644
--- a/hadoop-mapreduce-project/CHANGES.txt
+++ b/hadoop-mapreduce-project/CHANGES.txt
@@ -258,6 +258,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6228. Add truncate operation to SLive. (Plamen Jeliazkov via shv)
+ MAPREDUCE-5583. Ability to limit running map and reduce tasks.
+ (Jason Lowe via junping_du)
+
IMPROVEMENTS
MAPREDUCE-6149. Document override log4j.properties in MR job.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
index 1acfeec..efea674 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
@@ -99,9 +99,9 @@ public class RMContainerAllocator extends RMContainerRequestor
public static final
float DEFAULT_COMPLETED_MAPS_PERCENT_FOR_REDUCE_SLOWSTART = 0.05f;
- private static final Priority PRIORITY_FAST_FAIL_MAP;
- private static final Priority PRIORITY_REDUCE;
- private static final Priority PRIORITY_MAP;
+ static final Priority PRIORITY_FAST_FAIL_MAP;
+ static final Priority PRIORITY_REDUCE;
+ static final Priority PRIORITY_MAP;
@VisibleForTesting
public static final String RAMPDOWN_DIAGNOSTIC = "Reducer preempted "
@@ -166,6 +166,8 @@ public class RMContainerAllocator extends RMContainerRequestor
*/
private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0;
+ private int maxRunningMaps = 0;
+ private int maxRunningReduces = 0;
private long retryInterval;
private long retrystartTime;
private Clock clock;
@@ -201,6 +203,10 @@ public class RMContainerAllocator extends RMContainerRequestor
allocationDelayThresholdMs = conf.getInt(
MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
+ maxRunningMaps = conf.getInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT,
+ MRJobConfig.DEFAULT_JOB_RUNNING_MAP_LIMIT);
+ maxRunningReduces = conf.getInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT,
+ MRJobConfig.DEFAULT_JOB_RUNNING_REDUCE_LIMIT);
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -664,6 +670,8 @@ public class RMContainerAllocator extends RMContainerRequestor
@SuppressWarnings("unchecked")
private List<Container> getResources() throws Exception {
+ applyConcurrentTaskLimits();
+
// will be null the first time
Resource headRoom =
getAvailableResources() == null ? Resources.none() :
@@ -778,6 +786,43 @@ public class RMContainerAllocator extends RMContainerRequestor
return newContainers;
}
+ private void applyConcurrentTaskLimits() {
+ int numScheduledMaps = scheduledRequests.maps.size();
+ if (maxRunningMaps > 0 && numScheduledMaps > 0) {
+ int maxRequestedMaps = Math.max(0,
+ maxRunningMaps - assignedRequests.maps.size());
+ int numScheduledFailMaps = scheduledRequests.earlierFailedMaps.size();
+ int failedMapRequestLimit = Math.min(maxRequestedMaps,
+ numScheduledFailMaps);
+ int normalMapRequestLimit = Math.min(
+ maxRequestedMaps - failedMapRequestLimit,
+ numScheduledMaps - numScheduledFailMaps);
+ setRequestLimit(PRIORITY_FAST_FAIL_MAP, mapResourceRequest,
+ failedMapRequestLimit);
+ setRequestLimit(PRIORITY_MAP, mapResourceRequest, normalMapRequestLimit);
+ }
+
+ int numScheduledReduces = scheduledRequests.reduces.size();
+ if (maxRunningReduces > 0 && numScheduledReduces > 0) {
+ int maxRequestedReduces = Math.max(0,
+ maxRunningReduces - assignedRequests.reduces.size());
+ int reduceRequestLimit = Math.min(maxRequestedReduces,
+ numScheduledReduces);
+ setRequestLimit(PRIORITY_REDUCE, reduceResourceRequest,
+ reduceRequestLimit);
+ }
+ }
+
+ private boolean canAssignMaps() {
+ return (maxRunningMaps <= 0
+ || assignedRequests.maps.size() < maxRunningMaps);
+ }
+
+ private boolean canAssignReduces() {
+ return (maxRunningReduces <= 0
+ || assignedRequests.reduces.size() < maxRunningReduces);
+ }
+
private void updateAMRMToken(Token token) throws IOException {
org.apache.hadoop.security.token.Token<AMRMTokenIdentifier> amrmToken =
new org.apache.hadoop.security.token.Token<AMRMTokenIdentifier>(token
@@ -1046,8 +1091,7 @@ public class RMContainerAllocator extends RMContainerRequestor
it = allocatedContainers.iterator();
while (it.hasNext()) {
Container allocated = it.next();
- LOG.info("Releasing unassigned and invalid container "
- + allocated + ". RM may have assignment issues");
+ LOG.info("Releasing unassigned container " + allocated);
containerNotAssigned(allocated);
}
}
@@ -1150,7 +1194,8 @@ public class RMContainerAllocator extends RMContainerRequestor
private ContainerRequest assignToFailedMap(Container allocated) {
//try to assign to earlierFailedMaps if present
ContainerRequest assigned = null;
- while (assigned == null && earlierFailedMaps.size() > 0) {
+ while (assigned == null && earlierFailedMaps.size() > 0
+ && canAssignMaps()) {
TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
@@ -1168,7 +1213,7 @@ public class RMContainerAllocator extends RMContainerRequestor
private ContainerRequest assignToReduce(Container allocated) {
ContainerRequest assigned = null;
//try to assign to reduces if present
- if (assigned == null && reduces.size() > 0) {
+ if (assigned == null && reduces.size() > 0 && canAssignReduces()) {
TaskAttemptId tId = reduces.keySet().iterator().next();
assigned = reduces.remove(tId);
LOG.info("Assigned to reduce");
@@ -1180,7 +1225,7 @@ public class RMContainerAllocator extends RMContainerRequestor
private void assignMapsWithLocality(List<Container> allocatedContainers) {
// try to assign to all nodes first to match node local
Iterator<Container> it = allocatedContainers.iterator();
- while(it.hasNext() && maps.size() > 0){
+ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
@@ -1212,7 +1257,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// try to match all rack local
it = allocatedContainers.iterator();
- while(it.hasNext() && maps.size() > 0){
+ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
@@ -1242,7 +1287,7 @@ public class RMContainerAllocator extends RMContainerRequestor
// assign remaining
it = allocatedContainers.iterator();
- while(it.hasNext() && maps.size() > 0){
+ while(it.hasNext() && maps.size() > 0 && canAssignMaps()){
Container allocated = it.next();
Priority priority = allocated.getPriority();
assert PRIORITY_MAP.equals(priority);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
index bb9ad02..1666864 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
@@ -44,6 +45,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceBlacklistRequest;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@@ -58,6 +60,8 @@ import com.google.common.annotations.VisibleForTesting;
public abstract class RMContainerRequestor extends RMCommunicator {
private static final Log LOG = LogFactory.getLog(RMContainerRequestor.class);
+ private static final ResourceRequestComparator RESOURCE_REQUEST_COMPARATOR =
+ new ResourceRequestComparator();
protected int lastResponseID;
private Resource availableResources;
@@ -77,12 +81,18 @@ public abstract class RMContainerRequestor extends RMCommunicator {
// use custom comparator to make sure ResourceRequest objects differing only in
// numContainers dont end up as duplicates
private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
- new org.apache.hadoop.yarn.api.records.ResourceRequest.ResourceRequestComparator());
+ RESOURCE_REQUEST_COMPARATOR);
private final Set<ContainerId> release = new TreeSet<ContainerId>();
// pendingRelease holds history or release requests.request is removed only if
// RM sends completedContainer.
// How it different from release? --> release is for per allocate() request.
protected Set<ContainerId> pendingRelease = new TreeSet<ContainerId>();
+
+ private final Map<ResourceRequest,ResourceRequest> requestLimits =
+ new TreeMap<ResourceRequest,ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
+ private final Set<ResourceRequest> requestLimitsToUpdate =
+ new TreeSet<ResourceRequest>(RESOURCE_REQUEST_COMPARATOR);
+
private boolean nodeBlacklistingEnabled;
private int blacklistDisablePercent;
private AtomicBoolean ignoreBlacklisting = new AtomicBoolean(false);
@@ -178,6 +188,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
protected AllocateResponse makeRemoteRequest() throws YarnException,
IOException {
+ applyRequestLimits();
ResourceBlacklistRequest blacklistRequest =
ResourceBlacklistRequest.newInstance(new ArrayList<String>(blacklistAdditions),
new ArrayList<String>(blacklistRemovals));
@@ -190,13 +201,14 @@ public abstract class RMContainerRequestor extends RMCommunicator {
availableResources = allocateResponse.getAvailableResources();
lastClusterNmCount = clusterNmCount;
clusterNmCount = allocateResponse.getNumClusterNodes();
+ int numCompletedContainers =
+ allocateResponse.getCompletedContainersStatuses().size();
if (ask.size() > 0 || release.size() > 0) {
LOG.info("getResources() for " + applicationId + ":" + " ask="
+ ask.size() + " release= " + release.size() + " newContainers="
+ allocateResponse.getAllocatedContainers().size()
- + " finishedContainers="
- + allocateResponse.getCompletedContainersStatuses().size()
+ + " finishedContainers=" + numCompletedContainers
+ " resourcelimit=" + availableResources + " knownNMs="
+ clusterNmCount);
}
@@ -204,6 +216,12 @@ public abstract class RMContainerRequestor extends RMCommunicator {
ask.clear();
release.clear();
+ if (numCompletedContainers > 0) {
+ // re-send limited requests when a container completes to trigger asking
+ // for more containers
+ requestLimitsToUpdate.addAll(requestLimits.keySet());
+ }
+
if (blacklistAdditions.size() > 0 || blacklistRemovals.size() > 0) {
LOG.info("Update the blacklist for " + applicationId +
": blacklistAdditions=" + blacklistAdditions.size() +
@@ -214,6 +232,36 @@ public abstract class RMContainerRequestor extends RMCommunicator {
return allocateResponse;
}
+ private void applyRequestLimits() {
+ Iterator<ResourceRequest> iter = requestLimits.values().iterator();
+ while (iter.hasNext()) {
+ ResourceRequest reqLimit = iter.next();
+ int limit = reqLimit.getNumContainers();
+ Map<String, Map<Resource, ResourceRequest>> remoteRequests =
+ remoteRequestsTable.get(reqLimit.getPriority());
+ Map<Resource, ResourceRequest> reqMap = (remoteRequests != null)
+ ? remoteRequests.get(ResourceRequest.ANY) : null;
+ ResourceRequest req = (reqMap != null)
+ ? reqMap.get(reqLimit.getCapability()) : null;
+ if (req == null) {
+ continue;
+ }
+ // update an existing ask or send a new one if updating
+ if (ask.remove(req) || requestLimitsToUpdate.contains(req)) {
+ ResourceRequest newReq = req.getNumContainers() > limit
+ ? reqLimit : req;
+ ask.add(newReq);
+ LOG.info("Applying ask limit of " + newReq.getNumContainers()
+ + " for priority:" + reqLimit.getPriority()
+ + " and capability:" + reqLimit.getCapability());
+ }
+ if (limit == Integer.MAX_VALUE) {
+ iter.remove();
+ }
+ }
+ requestLimitsToUpdate.clear();
+ }
+
protected void addOutstandingRequestOnResync() {
for (Map<String, Map<Resource, ResourceRequest>> rr : remoteRequestsTable
.values()) {
@@ -229,6 +277,7 @@ public abstract class RMContainerRequestor extends RMCommunicator {
if (!pendingRelease.isEmpty()) {
release.addAll(pendingRelease);
}
+ requestLimitsToUpdate.addAll(requestLimits.keySet());
}
// May be incorrect if there's multiple NodeManagers running on a single host.
@@ -459,10 +508,8 @@ public abstract class RMContainerRequestor extends RMCommunicator {
private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
// because objects inside the resource map can be deleted ask can end up
// containing an object that matches new resource object but with different
- // numContainers. So exisintg values must be replaced explicitly
- if(ask.contains(remoteRequest)) {
- ask.remove(remoteRequest);
- }
+ // numContainers. So existing values must be replaced explicitly
+ ask.remove(remoteRequest);
ask.add(remoteRequest);
}
@@ -490,6 +537,19 @@ public abstract class RMContainerRequestor extends RMCommunicator {
return newReq;
}
+ protected void setRequestLimit(Priority priority, Resource capability,
+ int limit) {
+ if (limit < 0) {
+ limit = Integer.MAX_VALUE;
+ }
+ ResourceRequest newReqLimit = ResourceRequest.newInstance(priority,
+ ResourceRequest.ANY, capability, limit);
+ ResourceRequest oldReqLimit = requestLimits.put(newReqLimit, newReqLimit);
+ if (oldReqLimit == null || oldReqLimit.getNumContainers() < limit) {
+ requestLimitsToUpdate.add(newReqLimit);
+ }
+ }
+
public Set<String> getBlacklistedNodes() {
return blacklistedNodes;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
index 4759693..eca1a4d 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
@@ -31,9 +31,11 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import java.io.IOException;
+import java.nio.ByteBuffer;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.HashSet;
@@ -81,7 +83,13 @@ import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.FinishApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
@@ -89,6 +97,10 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.NMToken;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -2387,6 +2399,208 @@ public class TestRMContainerAllocator {
new Text(rmAddr), ugiToken.getService());
}
+ @Test
+ public void testConcurrentTaskLimits() throws Exception {
+ final int MAP_LIMIT = 3;
+ final int REDUCE_LIMIT = 1;
+ LOG.info("Running testConcurrentTaskLimits");
+ Configuration conf = new Configuration();
+ conf.setInt(MRJobConfig.JOB_RUNNING_MAP_LIMIT, MAP_LIMIT);
+ conf.setInt(MRJobConfig.JOB_RUNNING_REDUCE_LIMIT, REDUCE_LIMIT);
+ conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 1.0f);
+ ApplicationId appId = ApplicationId.newInstance(1, 1);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(
+ appId, 1);
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ final MockScheduler mockScheduler = new MockScheduler(appAttemptId);
+ MyContainerAllocator allocator = new MyContainerAllocator(null, conf,
+ appAttemptId, mockJob) {
+ @Override
+ protected void register() {
+ }
+
+ @Override
+ protected ApplicationMasterProtocol createSchedulerProxy() {
+ return mockScheduler;
+ }
+ };
+
+ // create some map requests
+ ContainerRequestEvent[] reqMapEvents = new ContainerRequestEvent[5];
+ for (int i = 0; i < reqMapEvents.length; ++i) {
+ reqMapEvents[i] = createReq(jobId, i, 1024, new String[] { "h" + i });
+ }
+ allocator.sendRequests(Arrays.asList(reqMapEvents));
+
+ // create some reduce requests
+ ContainerRequestEvent[] reqReduceEvents = new ContainerRequestEvent[2];
+ for (int i = 0; i < reqReduceEvents.length; ++i) {
+ reqReduceEvents[i] = createReq(jobId, i, 1024, new String[] {},
+ false, true);
+ }
+ allocator.sendRequests(Arrays.asList(reqReduceEvents));
+ allocator.schedule();
+
+ // verify all of the host-specific asks were sent plus one for the
+ // default rack and one for the ANY request
+ Assert.assertEquals(reqMapEvents.length + 2, mockScheduler.lastAsk.size());
+
+ // verify AM is only asking for the map limit overall
+ Assert.assertEquals(MAP_LIMIT, mockScheduler.lastAnyAskMap);
+
+ // assign a map task and verify we do not ask for any more maps
+ ContainerId cid0 = mockScheduler.assignContainer("h0", false);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(2, mockScheduler.lastAnyAskMap);
+
+ // complete the map task and verify that we ask for one more
+ mockScheduler.completeContainer(cid0);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(3, mockScheduler.lastAnyAskMap);
+
+ // assign three more maps and verify we ask for no more maps
+ ContainerId cid1 = mockScheduler.assignContainer("h1", false);
+ ContainerId cid2 = mockScheduler.assignContainer("h2", false);
+ ContainerId cid3 = mockScheduler.assignContainer("h3", false);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+ // complete two containers and verify we only asked for one more
+ // since at that point all maps should be scheduled/completed
+ mockScheduler.completeContainer(cid2);
+ mockScheduler.completeContainer(cid3);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(1, mockScheduler.lastAnyAskMap);
+
+ // allocate the last container and complete the first one
+ // and verify there are no more map asks.
+ mockScheduler.completeContainer(cid1);
+ ContainerId cid4 = mockScheduler.assignContainer("h4", false);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+ // complete the last map
+ mockScheduler.completeContainer(cid4);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskMap);
+
+ // verify only reduce limit being requested
+ Assert.assertEquals(REDUCE_LIMIT, mockScheduler.lastAnyAskReduce);
+
+ // assign a reducer and verify ask goes to zero
+ cid0 = mockScheduler.assignContainer("h0", true);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+
+ // complete the reducer and verify we ask for another
+ mockScheduler.completeContainer(cid0);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(1, mockScheduler.lastAnyAskReduce);
+
+ // assign a reducer and verify ask goes to zero
+ cid0 = mockScheduler.assignContainer("h0", true);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+
+ // complete the reducer and verify no more reducers
+ mockScheduler.completeContainer(cid0);
+ allocator.schedule();
+ allocator.schedule();
+ Assert.assertEquals(0, mockScheduler.lastAnyAskReduce);
+ allocator.close();
+ }
+
+ private static class MockScheduler implements ApplicationMasterProtocol {
+ ApplicationAttemptId attemptId;
+ long nextContainerId = 10;
+ List<ResourceRequest> lastAsk = null;
+ int lastAnyAskMap = 0;
+ int lastAnyAskReduce = 0;
+ List<ContainerStatus> containersToComplete =
+ new ArrayList<ContainerStatus>();
+ List<Container> containersToAllocate = new ArrayList<Container>();
+
+ public MockScheduler(ApplicationAttemptId attemptId) {
+ this.attemptId = attemptId;
+ }
+
+ @Override
+ public RegisterApplicationMasterResponse registerApplicationMaster(
+ RegisterApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return RegisterApplicationMasterResponse.newInstance(
+ Resource.newInstance(512, 1),
+ Resource.newInstance(512000, 1024),
+ Collections.<ApplicationAccessType,String>emptyMap(),
+ ByteBuffer.wrap("fake_key".getBytes()),
+ Collections.<Container>emptyList(),
+ "default",
+ Collections.<NMToken>emptyList());
+ }
+
+ @Override
+ public FinishApplicationMasterResponse finishApplicationMaster(
+ FinishApplicationMasterRequest request) throws YarnException,
+ IOException {
+ return FinishApplicationMasterResponse.newInstance(false);
+ }
+
+ @Override
+ public AllocateResponse allocate(AllocateRequest request)
+ throws YarnException, IOException {
+ lastAsk = request.getAskList();
+ for (ResourceRequest req : lastAsk) {
+ if (ResourceRequest.ANY.equals(req.getResourceName())) {
+ Priority priority = req.getPriority();
+ if (priority.equals(RMContainerAllocator.PRIORITY_MAP)) {
+ lastAnyAskMap = req.getNumContainers();
+ } else if (priority.equals(RMContainerAllocator.PRIORITY_REDUCE)){
+ lastAnyAskReduce = req.getNumContainers();
+ }
+ }
+ }
+ AllocateResponse response = AllocateResponse.newInstance(
+ request.getResponseId(),
+ containersToComplete, containersToAllocate,
+ Collections.<NodeReport>emptyList(),
+ Resource.newInstance(512000, 1024), null, 10, null,
+ Collections.<NMToken>emptyList());
+ containersToComplete.clear();
+ containersToAllocate.clear();
+ return response;
+ }
+
+ public ContainerId assignContainer(String nodeName, boolean isReduce) {
+ ContainerId containerId =
+ ContainerId.newContainerId(attemptId, nextContainerId++);
+ Priority priority = isReduce ? RMContainerAllocator.PRIORITY_REDUCE
+ : RMContainerAllocator.PRIORITY_MAP;
+ Container container = Container.newInstance(containerId,
+ NodeId.newInstance(nodeName, 1234), nodeName + ":5678",
+ Resource.newInstance(1024, 1), priority, null);
+ containersToAllocate.add(container);
+ return containerId;
+ }
+
+ public void completeContainer(ContainerId containerId) {
+ containersToComplete.add(ContainerStatus.newInstance(containerId,
+ ContainerState.COMPLETE, "", 0));
+ }
+ }
+
public static void main(String[] args) throws Exception {
TestRMContainerAllocator t = new TestRMContainerAllocator();
t.testSimple();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
index d06b075..5527103 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
@@ -373,6 +373,14 @@ public interface MRJobConfig {
public static final String DEFAULT_JOB_ACL_MODIFY_JOB = " ";
+ public static final String JOB_RUNNING_MAP_LIMIT =
+ "mapreduce.job.running.map.limit";
+ public static final int DEFAULT_JOB_RUNNING_MAP_LIMIT = 0;
+
+ public static final String JOB_RUNNING_REDUCE_LIMIT =
+ "mapreduce.job.running.reduce.limit";
+ public static final int DEFAULT_JOB_RUNNING_REDUCE_LIMIT = 0;
+
/* config for tracking the local file where all the credentials for the job
* credentials.
*/
http://git-wip-us.apache.org/repos/asf/hadoop/blob/4228de94/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
----------------------------------------------------------------------
diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
index 6e80679..d864756 100644
--- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
+++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
@@ -83,6 +83,22 @@
</property>
<property>
+ <name>mapreduce.job.running.map.limit</name>
+ <value>0</value>
+ <description>The maximum number of simultaneous map tasks per job.
+ There is no limit if this value is 0 or negative.
+ </description>
+</property>
+
+<property>
+ <name>mapreduce.job.running.reduce.limit</name>
+ <value>0</value>
+ <description>The maximum number of simultaneous reduce tasks per job.
+ There is no limit if this value is 0 or negative.
+ </description>
+</property>
+
+<property>
<name>mapreduce.job.reducer.preempt.delay.sec</name>
<value>0</value>
<description>The threshold in terms of seconds after which an unsatisfied mapper