You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ka...@apache.org on 2014/06/19 19:22:57 UTC
svn commit: r1603957 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
dev-support/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/te...
Author: kasha
Date: Thu Jun 19 17:22:56 2014
New Revision: 1603957
URL: http://svn.apache.org/r1603957
Log:
MAPREDUCE-5844. Add a configurable delay to reducer-preemption. (Maysam Yabandeh via kasha)
Added:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java
- copied, changed from r1603945, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Removed:
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Thu Jun 19 17:22:56 2014
@@ -216,6 +216,9 @@ Release 2.5.0 - UNRELEASED
MAPREDUCE-5896. InputSplits should indicate which locations have the block
cached in memory. (Sandy Ryza via kasha)
+ MAPREDUCE-5844. Add a configurable delay to reducer-preemption.
+ (Maysam Yabandeh via kasha)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml Thu Jun 19 17:22:56 2014
@@ -475,8 +475,8 @@
<Match>
<Class name="org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator" />
<Or>
- <Field name="mapResourceReqt" />
- <Field name="reduceResourceReqt" />
+ <Field name="mapResourceRequest" />
+ <Field name="reduceResourceRequest" />
<Field name="maxReduceRampupLimit" />
<Field name="reduceSlowStart" />
</Or>
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jun 19 17:22:56 2014
@@ -73,6 +73,7 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.client.api.NMTokenCache;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.RackResolver;
import com.google.common.annotations.VisibleForTesting;
@@ -143,15 +144,21 @@ public class RMContainerAllocator extend
private int lastCompletedTasks = 0;
private boolean recalculateReduceSchedule = false;
- private int mapResourceReqt;//memory
- private int reduceResourceReqt;//memory
+ private int mapResourceRequest;//memory
+ private int reduceResourceRequest;//memory
private boolean reduceStarted = false;
private float maxReduceRampupLimit = 0;
private float maxReducePreemptionLimit = 0;
+ /**
+ * after this threshold, if the container request is not allocated, it is
+ * considered delayed.
+ */
+ private long allocationDelayThresholdMs = 0;
private float reduceSlowStart = 0;
private long retryInterval;
private long retrystartTime;
+ private Clock clock;
private final AMPreemptionPolicy preemptionPolicy;
@@ -166,6 +173,7 @@ public class RMContainerAllocator extend
super(clientService, context);
this.preemptionPolicy = preemptionPolicy;
this.stopped = new AtomicBoolean(false);
+ this.clock = context.getClock();
}
@Override
@@ -180,6 +188,9 @@ public class RMContainerAllocator extend
maxReducePreemptionLimit = conf.getFloat(
MRJobConfig.MR_AM_JOB_REDUCE_PREEMPTION_LIMIT,
MRJobConfig.DEFAULT_MR_AM_JOB_REDUCE_PREEMPTION_LIMIT);
+ allocationDelayThresholdMs = conf.getInt(
+ MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+ MRJobConfig.DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC) * 1000;//sec -> ms
RackResolver.init(conf);
retryInterval = getConfig().getLong(MRJobConfig.MR_AM_TO_RM_WAIT_INTERVAL_MS,
MRJobConfig.DEFAULT_MR_AM_TO_RM_WAIT_INTERVAL_MS);
@@ -246,7 +257,7 @@ public class RMContainerAllocator extend
getJob().getTotalMaps(), completedMaps,
scheduledRequests.maps.size(), scheduledRequests.reduces.size(),
assignedRequests.maps.size(), assignedRequests.reduces.size(),
- mapResourceReqt, reduceResourceReqt,
+ mapResourceRequest, reduceResourceRequest,
pendingReduces.size(),
maxReduceRampupLimit, reduceSlowStart);
recalculateReduceSchedule = false;
@@ -268,6 +279,18 @@ public class RMContainerAllocator extend
scheduleStats.log("Final Stats: ");
}
+ @Private
+ @VisibleForTesting
+ AssignedRequests getAssignedRequests() {
+ return assignedRequests;
+ }
+
+ @Private
+ @VisibleForTesting
+ ScheduledRequests getScheduledRequests() {
+ return scheduledRequests;
+ }
+
public boolean getIsReduceStarted() {
return reduceStarted;
}
@@ -303,16 +326,16 @@ public class RMContainerAllocator extend
int supportedMaxContainerCapability =
getMaxContainerCapability().getMemory();
if (reqEvent.getAttemptID().getTaskId().getTaskType().equals(TaskType.MAP)) {
- if (mapResourceReqt == 0) {
- mapResourceReqt = reqEvent.getCapability().getMemory();
+ if (mapResourceRequest == 0) {
+ mapResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(org.apache.hadoop.mapreduce.TaskType.MAP,
- mapResourceReqt)));
- LOG.info("mapResourceReqt:"+mapResourceReqt);
- if (mapResourceReqt > supportedMaxContainerCapability) {
+ mapResourceRequest)));
+ LOG.info("mapResourceRequest:"+ mapResourceRequest);
+ if (mapResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "MAP capability required is more than the supported " +
- "max container capability in the cluster. Killing the Job. mapResourceReqt: " +
- mapResourceReqt + " maxContainerCapability:" + supportedMaxContainerCapability;
+ "max container capability in the cluster. Killing the Job. mapResourceRequest: " +
+ mapResourceRequest + " maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
jobId, diagMsg));
@@ -320,20 +343,20 @@ public class RMContainerAllocator extend
}
}
//set the rounded off memory
- reqEvent.getCapability().setMemory(mapResourceReqt);
+ reqEvent.getCapability().setMemory(mapResourceRequest);
scheduledRequests.addMap(reqEvent);//maps are immediately scheduled
} else {
- if (reduceResourceReqt == 0) {
- reduceResourceReqt = reqEvent.getCapability().getMemory();
+ if (reduceResourceRequest == 0) {
+ reduceResourceRequest = reqEvent.getCapability().getMemory();
eventHandler.handle(new JobHistoryEvent(jobId,
new NormalizedResourceEvent(
org.apache.hadoop.mapreduce.TaskType.REDUCE,
- reduceResourceReqt)));
- LOG.info("reduceResourceReqt:"+reduceResourceReqt);
- if (reduceResourceReqt > supportedMaxContainerCapability) {
+ reduceResourceRequest)));
+ LOG.info("reduceResourceRequest:"+ reduceResourceRequest);
+ if (reduceResourceRequest > supportedMaxContainerCapability) {
String diagMsg = "REDUCE capability required is more than the " +
"supported max container capability in the cluster. Killing the " +
- "Job. reduceResourceReqt: " + reduceResourceReqt +
+ "Job. reduceResourceRequest: " + reduceResourceRequest +
" maxContainerCapability:" + supportedMaxContainerCapability;
LOG.info(diagMsg);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
@@ -342,7 +365,7 @@ public class RMContainerAllocator extend
}
}
//set the rounded off memory
- reqEvent.getCapability().setMemory(reduceResourceReqt);
+ reqEvent.getCapability().setMemory(reduceResourceRequest);
if (reqEvent.getEarlierAttemptFailed()) {
//add to the front of queue for fail fast
pendingReduces.addFirst(new ContainerRequest(reqEvent, PRIORITY_REDUCE));
@@ -394,8 +417,22 @@ public class RMContainerAllocator extend
return host;
}
- private void preemptReducesIfNeeded() {
- if (reduceResourceReqt == 0) {
+ @Private
+ @VisibleForTesting
+ synchronized void setReduceResourceRequest(int mem) {
+ this.reduceResourceRequest = mem;
+ }
+
+ @Private
+ @VisibleForTesting
+ synchronized void setMapResourceRequest(int mem) {
+ this.mapResourceRequest = mem;
+ }
+
+ @Private
+ @VisibleForTesting
+ void preemptReducesIfNeeded() {
+ if (reduceResourceRequest == 0) {
return; //no reduces
}
//check if reduces have taken over the whole cluster and there are
@@ -403,9 +440,9 @@ public class RMContainerAllocator extend
if (scheduledRequests.maps.size() > 0) {
int memLimit = getMemLimit();
int availableMemForMap = memLimit - ((assignedRequests.reduces.size() -
- assignedRequests.preemptionWaitingReduces.size()) * reduceResourceReqt);
+ assignedRequests.preemptionWaitingReduces.size()) * reduceResourceRequest);
//availableMemForMap must be sufficient to run atleast 1 map
- if (availableMemForMap < mapResourceReqt) {
+ if (availableMemForMap < mapResourceRequest) {
//to make sure new containers are given to maps and not reduces
//ramp down all scheduled reduces if any
//(since reduces are scheduled at higher priority than maps)
@@ -414,22 +451,40 @@ public class RMContainerAllocator extend
pendingReduces.add(req);
}
scheduledRequests.reduces.clear();
-
- //preempt for making space for at least one map
- int premeptionLimit = Math.max(mapResourceReqt,
- (int) (maxReducePreemptionLimit * memLimit));
-
- int preemptMem = Math.min(scheduledRequests.maps.size() * mapResourceReqt,
- premeptionLimit);
-
- int toPreempt = (int) Math.ceil((float) preemptMem/reduceResourceReqt);
- toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
-
- LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
- assignedRequests.preemptReduce(toPreempt);
+
+ //do further checking to find the number of map requests that were
+ //hanging around for a while
+ int hangingMapRequests = getNumOfHangingRequests(scheduledRequests.maps);
+ if (hangingMapRequests > 0) {
+ //preempt for making space for at least one map
+ int premeptionLimit = Math.max(mapResourceRequest,
+ (int) (maxReducePreemptionLimit * memLimit));
+
+ int preemptMem = Math.min(hangingMapRequests * mapResourceRequest,
+ premeptionLimit);
+
+ int toPreempt = (int) Math.ceil((float) preemptMem / reduceResourceRequest);
+ toPreempt = Math.min(toPreempt, assignedRequests.reduces.size());
+
+ LOG.info("Going to preempt " + toPreempt + " due to lack of space for maps");
+ assignedRequests.preemptReduce(toPreempt);
+ }
}
}
}
+
+ private int getNumOfHangingRequests(Map<TaskAttemptId, ContainerRequest> requestMap) {
+ if (allocationDelayThresholdMs <= 0)
+ return requestMap.size();
+ int hangingRequests = 0;
+ long currTime = clock.getTime();
+ for (ContainerRequest request: requestMap.values()) {
+ long delay = currTime - request.requestTimeMs;
+ if (delay > allocationDelayThresholdMs)
+ hangingRequests++;
+ }
+ return hangingRequests;
+ }
@Private
public void scheduleReduces(
@@ -715,11 +770,13 @@ public class RMContainerAllocator extend
@Private
public int getMemLimit() {
int headRoom = getAvailableResources() != null ? getAvailableResources().getMemory() : 0;
- return headRoom + assignedRequests.maps.size() * mapResourceReqt +
- assignedRequests.reduces.size() * reduceResourceReqt;
+ return headRoom + assignedRequests.maps.size() * mapResourceRequest +
+ assignedRequests.reduces.size() * reduceResourceRequest;
}
-
- private class ScheduledRequests {
+
+ @Private
+ @VisibleForTesting
+ class ScheduledRequests {
private final LinkedList<TaskAttemptId> earlierFailedMaps =
new LinkedList<TaskAttemptId>();
@@ -729,7 +786,8 @@ public class RMContainerAllocator extend
new HashMap<String, LinkedList<TaskAttemptId>>();
private final Map<String, LinkedList<TaskAttemptId>> mapsRackMapping =
new HashMap<String, LinkedList<TaskAttemptId>>();
- private final Map<TaskAttemptId, ContainerRequest> maps =
+ @VisibleForTesting
+ final Map<TaskAttemptId, ContainerRequest> maps =
new LinkedHashMap<TaskAttemptId, ContainerRequest>();
private final LinkedHashMap<TaskAttemptId, ContainerRequest> reduces =
@@ -825,22 +883,22 @@ public class RMContainerAllocator extend
int allocatedMemory = allocated.getResource().getMemory();
if (PRIORITY_FAST_FAIL_MAP.equals(priority)
|| PRIORITY_MAP.equals(priority)) {
- if (allocatedMemory < mapResourceReqt
+ if (allocatedMemory < mapResourceRequest
|| maps.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a map as either "
- + " container memory less than required " + mapResourceReqt
+ + " container memory less than required " + mapResourceRequest
+ " or no pending map tasks - maps.isEmpty="
+ maps.isEmpty());
isAssignable = false;
}
}
else if (PRIORITY_REDUCE.equals(priority)) {
- if (allocatedMemory < reduceResourceReqt
+ if (allocatedMemory < reduceResourceRequest
|| reduces.isEmpty()) {
LOG.info("Cannot assign container " + allocated
+ " for a reduce as either "
- + " container memory less than required " + reduceResourceReqt
+ + " container memory less than required " + reduceResourceRequest
+ " or no pending reduce tasks - reduces.isEmpty="
+ reduces.isEmpty());
isAssignable = false;
@@ -1119,14 +1177,18 @@ public class RMContainerAllocator extend
}
}
- private class AssignedRequests {
+ @Private
+ @VisibleForTesting
+ class AssignedRequests {
private final Map<ContainerId, TaskAttemptId> containerToAttemptMap =
new HashMap<ContainerId, TaskAttemptId>();
private final LinkedHashMap<TaskAttemptId, Container> maps =
new LinkedHashMap<TaskAttemptId, Container>();
- private final LinkedHashMap<TaskAttemptId, Container> reduces =
+ @VisibleForTesting
+ final LinkedHashMap<TaskAttemptId, Container> reduces =
new LinkedHashMap<TaskAttemptId, Container>();
- private final Set<TaskAttemptId> preemptionWaitingReduces =
+ @VisibleForTesting
+ final Set<TaskAttemptId> preemptionWaitingReduces =
new HashSet<TaskAttemptId>();
void add(Container container, TaskAttemptId tId) {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Jun 19 17:22:56 2014
@@ -29,8 +29,10 @@ import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId;
@@ -96,6 +98,8 @@ public abstract class RMContainerRequest
super(clientService, context);
}
+ @Private
+ @VisibleForTesting
static class ContainerRequest {
final TaskAttemptId attemptID;
final Resource capability;
@@ -103,20 +107,39 @@ public abstract class RMContainerRequest
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
-
+ /**
+ * the time when this request object was formed; can be used to avoid
+ * aggressive preemption for recently placed requests
+ */
+ final long requestTimeMs;
+
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
this(event.getAttemptID(), event.getCapability(), event.getHosts(),
event.getRacks(), priority);
}
-
+
+ public ContainerRequest(ContainerRequestEvent event, Priority priority,
+ long requestTimeMs) {
+ this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+ event.getRacks(), priority, requestTimeMs);
+ }
+
+ public ContainerRequest(TaskAttemptId attemptID,
+ Resource capability, String[] hosts, String[] racks,
+ Priority priority) {
+ this(attemptID, capability, hosts, racks, priority,
+ System.currentTimeMillis());
+ }
+
public ContainerRequest(TaskAttemptId attemptID,
- Resource capability, String[] hosts, String[] racks,
- Priority priority) {
+ Resource capability, String[] hosts, String[] racks,
+ Priority priority, long requestTimeMs) {
this.attemptID = attemptID;
this.capability = capability;
this.hosts = hosts;
this.racks = racks;
this.priority = priority;
+ this.requestTimeMs = requestTimeMs;
}
public String toString() {
Copied: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java (from r1603945, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java)
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java?p2=hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java&p1=hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java&r1=1603945&r2=1603957&rev=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/rm/TestRMContainerAllocator.java Thu Jun 19 17:22:56 2014
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.hadoop.mapreduce.v2.app;
+package org.apache.hadoop.mapreduce.v2.app.rm;
import static org.mockito.Matchers.anyFloat;
import static org.mockito.Matchers.anyInt;
@@ -40,6 +40,10 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.atomic.AtomicBoolean;
+import org.apache.hadoop.mapreduce.v2.app.AppContext;
+import org.apache.hadoop.mapreduce.v2.app.ClusterInfo;
+import org.apache.hadoop.mapreduce.v2.app.ControlledClock;
+import org.apache.hadoop.mapreduce.v2.app.MRApp;
import org.junit.Assert;
import org.apache.commons.logging.Log;
@@ -65,10 +69,6 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
import org.apache.hadoop.mapreduce.v2.app.job.impl.TaskAttemptImpl;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
-import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.app.rm.preemption.NoopAMPreemptionPolicy;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -80,6 +80,7 @@ import org.apache.hadoop.test.GenericTes
import org.apache.hadoop.yarn.api.ApplicationMasterProtocol;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerState;
@@ -422,6 +423,115 @@ public class TestRMContainerAllocator {
killEventMessage.contains(RMContainerAllocator.RAMPDOWN_DIAGNOSTIC));
}
+ @Test(timeout = 30000)
+ public void testPreemptReducers() throws Exception {
+ LOG.info("Running testPreemptReducers");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob, new SystemClock());
+ allocator.setMapResourceRequest(1024);
+ allocator.setReduceResourceRequest(1024);
+ RMContainerAllocator.AssignedRequests assignedRequests =
+ allocator.getAssignedRequests();
+ RMContainerAllocator.ScheduledRequests scheduledRequests =
+ allocator.getScheduledRequests();
+ ContainerRequestEvent event1 =
+ createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ scheduledRequests.maps.put(mock(TaskAttemptId.class),
+ new RMContainerRequestor.ContainerRequest(event1, null));
+ assignedRequests.reduces.put(mock(TaskAttemptId.class),
+ mock(Container.class));
+
+ allocator.preemptReducesIfNeeded();
+ Assert.assertEquals("The reducer is not preempted",
+ 1, assignedRequests.preemptionWaitingReduces.size());
+ }
+
+ @Test(timeout = 30000)
+ public void testNonAggressivelyPreemptReducers() throws Exception {
+ LOG.info("Running testPreemptReducers");
+
+ final int preemptThreshold = 2; //sec
+ Configuration conf = new Configuration();
+ conf.setInt(
+ MRJobConfig.MR_JOB_REDUCER_PREEMPT_DELAY_SEC,
+ preemptThreshold);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ ControlledClock clock = new ControlledClock(null);
+ clock.setTime(1);
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob, clock);
+ allocator.setMapResourceRequest(1024);
+ allocator.setReduceResourceRequest(1024);
+ RMContainerAllocator.AssignedRequests assignedRequests =
+ allocator.getAssignedRequests();
+ RMContainerAllocator.ScheduledRequests scheduledRequests =
+ allocator.getScheduledRequests();
+ ContainerRequestEvent event1 =
+ createReq(jobId, 1, 2048, new String[] { "h1" }, false, false);
+ scheduledRequests.maps.put(mock(TaskAttemptId.class),
+ new RMContainerRequestor.ContainerRequest(event1, null, clock.getTime()));
+ assignedRequests.reduces.put(mock(TaskAttemptId.class),
+ mock(Container.class));
+
+ clock.setTime(clock.getTime() + 1);
+ allocator.preemptReducesIfNeeded();
+ Assert.assertEquals("The reducer is aggressively preeempted", 0,
+ assignedRequests.preemptionWaitingReduces.size());
+
+ clock.setTime(clock.getTime() + (preemptThreshold) * 1000);
+ allocator.preemptReducesIfNeeded();
+ Assert.assertEquals("The reducer is not preeempted", 1,
+ assignedRequests.preemptionWaitingReduces.size());
+ }
+
@Test
public void testMapReduceScheduling() throws Exception {
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/MRJobConfig.java Thu Jun 19 17:22:56 2014
@@ -579,7 +579,17 @@ public interface MRJobConfig {
MR_AM_PREFIX + "history.use-batched-flush.queue-size.threshold";
public static final int DEFAULT_MR_AM_HISTORY_USE_BATCHED_FLUSH_QUEUE_SIZE_THRESHOLD =
50;
-
+
+ /**
+ * The threshold in terms of seconds after which an unsatisfied mapper request
+ * triggers reducer preemption to free space. Default 0 implies that the reduces
+ * should be preempted immediately after allocation if there is currently no
+ * room for newly allocated mappers.
+ */
+ public static final String MR_JOB_REDUCER_PREEMPT_DELAY_SEC =
+ "mapreduce.job.reducer.preempt.delay.sec";
+ public static final int DEFAULT_MR_JOB_REDUCER_PREEMPT_DELAY_SEC = 0;
+
public static final String MR_AM_ENV =
MR_AM_PREFIX + "env";
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml?rev=1603957&r1=1603956&r2=1603957&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml Thu Jun 19 17:22:56 2014
@@ -83,6 +83,16 @@
</property>
<property>
+ <name>mapreduce.job.reducer.preempt.delay.sec</name>
+ <value>0</value>
+ <description>The threshold in terms of seconds after which an unsatisfied mapper
+ request triggers reducer preemption to free space. Default 0 implies that the
+ reduces should be preempted immediately after allocation if there is currently no
+ room for newly allocated mappers.
+ </description>
+</property>
+
+<property>
<name>mapreduce.job.max.split.locations</name>
<value>10</value>
<description>The max number of block locations to store for each split for