You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/10/20 00:02:22 UTC
svn commit: r1186529 - in /hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/a...
Author: acmurthy
Date: Wed Oct 19 22:02:21 2011
New Revision: 1186529
URL: http://svn.apache.org/viewvc?rev=1186529&view=rev
Log:
MAPREDUCE-2693. Fix NPE in job-blacklisting. Contributed by Hitesh Shah.
Modified:
hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt?rev=1186529&r1=1186528&r2=1186529&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/CHANGES.txt Wed Oct 19 22:02:21 2011
@@ -1695,6 +1695,8 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2788. Normalize resource requests in FifoScheduler
appropriately. (Ahmed Radwan via acmurthy)
+ MAPREDUCE-2693. Fix NPE in job-blacklisting. (Hitesh Shah via acmurthy)
+
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1186529&r1=1186528&r2=1186529&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Wed Oct 19 22:02:21 2011
@@ -509,18 +509,6 @@ public class RMContainerAllocator extend
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
} else {
for (String host : event.getHosts()) {
- //host comes from data splitLocations which are hostnames. Containers
- // use IP addresses.
- //TODO Temporary fix for locality. Use resolvers from h-common.
- // Cache to make this more efficient ?
- InetAddress addr = null;
- try {
- addr = InetAddress.getByName(host);
- } catch (UnknownHostException e) {
- LOG.warn("Unable to resolve host to IP for host [: " + host + "]");
- }
- if (addr != null) //Fallback to host if resolve fails.
- host = addr.getHostAddress();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
if (list == null) {
list = new LinkedList<TaskAttemptId>();
@@ -557,26 +545,101 @@ public class RMContainerAllocator extend
while (it.hasNext()) {
Container allocated = it.next();
LOG.info("Assigning container " + allocated);
- ContainerRequest assigned = assign(allocated);
-
- if (assigned != null) {
- // Update resource requests
- decContainerReq(assigned);
+
+ // check if allocated container meets memory requirements
+ // and whether we have any scheduled tasks that need
+ // a container to be assigned
+ boolean isAssignable = true;
+ Priority priority = allocated.getPriority();
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)
+ || PRIORITY_MAP.equals(priority)) {
+ if (allocated.getResource().getMemory() < mapResourceReqt
+ || maps.isEmpty()) {
+ LOG.info("Cannot assign container " + allocated
+ + " for a map as either "
+ + " container memory less than required " + mapResourceReqt
+ + " or no pending map tasks - maps.isEmpty="
+ + maps.isEmpty());
+ isAssignable = false;
+ }
+ }
+ else if (PRIORITY_REDUCE.equals(priority)) {
+ if (allocated.getResource().getMemory() < reduceResourceReqt
+ || reduces.isEmpty()) {
+ LOG.info("Cannot assign container " + allocated
+ + " for a reduce as either "
+ + " container memory less than required " + reduceResourceReqt
+ + " or no pending reduce tasks - reduces.isEmpty="
+ + reduces.isEmpty());
+ isAssignable = false;
+ }
+ }
+
+ boolean blackListed = false;
+ ContainerRequest assigned = null;
+
+ if (isAssignable) {
+ // do not assign if allocated container is on a
+ // blacklisted host
+ blackListed = isNodeBlacklisted(allocated.getNodeId().getHost());
+ if (blackListed) {
+ // we need to request for a new container
+ // and release the current one
+ LOG.info("Got allocated container on a blacklisted "
+ + " host. Releasing container " + allocated);
- // send the container-assigned event to task attempt
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
- assigned.attemptID, allocated));
+ // find the request matching this allocated container
+ // and replace it with a new one
+ ContainerRequest toBeReplacedReq =
+ getContainerReqToReplace(allocated);
+ if (toBeReplacedReq != null) {
+ LOG.info("Placing a new container request for task attempt "
+ + toBeReplacedReq.attemptID);
+ ContainerRequest newReq =
+ getFilteredContainerRequest(toBeReplacedReq);
+ decContainerReq(toBeReplacedReq);
+ if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+ TaskType.MAP) {
+ maps.put(newReq.attemptID, newReq);
+ }
+ else {
+ reduces.put(newReq.attemptID, newReq);
+ }
+ addContainerReq(newReq);
+ }
+ else {
+ LOG.info("Could not map allocated container to a valid request."
+ + " Releasing allocated container " + allocated);
+ }
+ }
+ else {
+ assigned = assign(allocated);
+ if (assigned != null) {
+ // Update resource requests
+ decContainerReq(assigned);
+
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ assigned.attemptID, allocated));
- assignedRequests.add(allocated.getId(), assigned.attemptID);
-
- LOG.info("Assigned container (" + allocated + ") " +
- " to task " + assigned.attemptID +
- " on node " + allocated.getNodeId().toString());
- } else {
- //not assigned to any request, release the container
- LOG.info("Releasing unassigned and invalid container " + allocated
- + ". RM has gone crazy, someone go look!"
- + " Hey RM, if you are so rich, go donate to non-profits!");
+ assignedRequests.add(allocated.getId(), assigned.attemptID);
+
+ LOG.info("Assigned container (" + allocated + ") " +
+ " to task " + assigned.attemptID +
+ " on node " + allocated.getNodeId().toString());
+ }
+ else {
+ //not assigned to any request, release the container
+ LOG.info("Releasing unassigned and invalid container "
+ + allocated + ". RM has gone crazy, someone go look!"
+ + " Hey RM, if you are so rich, go donate to non-profits!");
+ }
+ }
+ }
+
+ // release container if it was blacklisted
+ // or if we could not assign it
+ if (blackListed || assigned == null) {
containersReleased++;
release(allocated.getId());
}
@@ -604,12 +667,37 @@ public class RMContainerAllocator extend
return assigned;
}
+ private ContainerRequest getContainerReqToReplace(Container allocated) {
+ Priority priority = allocated.getPriority();
+ ContainerRequest toBeReplaced = null;
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)
+ || PRIORITY_MAP.equals(priority)) {
+ // allocated container was for a map
+ String host = allocated.getNodeId().getHost();
+ LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
+ if (list != null && list.size() > 0) {
+ TaskAttemptId tId = list.removeLast();
+ if (maps.containsKey(tId)) {
+ toBeReplaced = maps.remove(tId);
+ }
+ }
+ else {
+ TaskAttemptId tId = maps.keySet().iterator().next();
+ toBeReplaced = maps.remove(tId);
+ }
+ }
+ else if (PRIORITY_REDUCE.equals(priority)) {
+ TaskAttemptId tId = reduces.keySet().iterator().next();
+ toBeReplaced = reduces.remove(tId);
+ }
+ return toBeReplaced;
+ }
+
private ContainerRequest assignToFailedMap(Container allocated) {
//try to assign to earlierFailedMaps if present
ContainerRequest assigned = null;
- while (assigned == null && earlierFailedMaps.size() > 0 &&
- allocated.getResource().getMemory() >= mapResourceReqt) {
+ while (assigned == null && earlierFailedMaps.size() > 0) {
TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
@@ -627,8 +715,7 @@ public class RMContainerAllocator extend
private ContainerRequest assignToReduce(Container allocated) {
ContainerRequest assigned = null;
//try to assign to reduces if present
- if (assigned == null && reduces.size() > 0
- && allocated.getResource().getMemory() >= reduceResourceReqt) {
+ if (assigned == null && reduces.size() > 0) {
TaskAttemptId tId = reduces.keySet().iterator().next();
assigned = reduces.remove(tId);
LOG.info("Assigned to reduce");
@@ -640,9 +727,8 @@ public class RMContainerAllocator extend
//try to assign to maps if present
//first by host, then by rack, followed by *
ContainerRequest assigned = null;
- while (assigned == null && maps.size() > 0
- && allocated.getResource().getMemory() >= mapResourceReqt) {
- String host = getHost(allocated.getNodeId().toString());
+ while (assigned == null && maps.size() > 0) {
+ String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
LOG.info("Host matched to the request list " + host);
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1186529&r1=1186528&r2=1186529&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Wed Oct 19 22:02:21 2011
@@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.rm;
+import java.net.InetAddress;
+import java.net.UnknownHostException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
@@ -63,7 +65,7 @@ public abstract class RMContainerRequest
//Key->ResourceName (e.g., hostname, rackname, *)
//Value->Map
//Key->Resource Capability
- //Value->ResourceReqeust
+ //Value->ResourceRequest
private final Map<Priority, Map<String, Map<Resource, ResourceRequest>>>
remoteRequestsTable =
new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
@@ -87,14 +89,22 @@ public abstract class RMContainerRequest
final String[] racks;
//final boolean earlierAttemptFailed;
final Priority priority;
+
public ContainerRequest(ContainerRequestEvent event, Priority priority) {
- this.attemptID = event.getAttemptID();
- this.capability = event.getCapability();
- this.hosts = event.getHosts();
- this.racks = event.getRacks();
- //this.earlierAttemptFailed = event.getEarlierAttemptFailed();
+ this(event.getAttemptID(), event.getCapability(), event.getHosts(),
+ event.getRacks(), priority);
+ }
+
+ public ContainerRequest(TaskAttemptId attemptID,
+ Resource capability, String[] hosts, String[] racks,
+ Priority priority) {
+ this.attemptID = attemptID;
+ this.capability = capability;
+ this.hosts = hosts;
+ this.racks = racks;
this.priority = priority;
}
+
}
@Override
@@ -149,14 +159,37 @@ public abstract class RMContainerRequest
//remove all the requests corresponding to this hostname
for (Map<String, Map<Resource, ResourceRequest>> remoteRequests
: remoteRequestsTable.values()){
- //remove from host
- Map<Resource, ResourceRequest> reqMap = remoteRequests.remove(hostName);
+ //remove from host if no pending allocations
+ boolean foundAll = true;
+ Map<Resource, ResourceRequest> reqMap = remoteRequests.get(hostName);
if (reqMap != null) {
for (ResourceRequest req : reqMap.values()) {
- ask.remove(req);
+ if (!ask.remove(req)) {
+ foundAll = false;
+ }
+ else {
+ // if ask already sent to RM, we can try and overwrite it if possible.
+ // send a new ask to RM with numContainers
+ // specified for the blacklisted host to be 0.
+ ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
+ zeroedRequest.setNumContainers(0);
+ // to be sent to RM on next heartbeat
+ ask.add(zeroedRequest);
+ }
}
+ // if all requests were still in ask queue
+ // we can remove this request
+ if (foundAll) {
+ remoteRequests.remove(hostName);
+ }
}
- //TODO: remove from rack
+ // TODO handling of rack blacklisting
+ // Removing from rack should be dependent on no. of failures within the rack
+ // Blacklisting a rack on the basis of a single node's blacklisting
+ // may be overly aggressive.
+ // Node failures could be co-related with other failures on the same rack
+ // but we probably need a better approach at trying to decide how and when
+ // to blacklist a rack
}
} else {
nodeFailures.put(hostName, failures);
@@ -171,7 +204,9 @@ public abstract class RMContainerRequest
// Create resource requests
for (String host : req.hosts) {
// Data-local
- addResourceRequest(req.priority, host, req.capability);
+ if (!isNodeBlacklisted(host)) {
+ addResourceRequest(req.priority, host, req.capability);
+ }
}
// Nothing Rack-local for now
@@ -234,6 +269,14 @@ public abstract class RMContainerRequest
Map<String, Map<Resource, ResourceRequest>> remoteRequests =
this.remoteRequestsTable.get(priority);
Map<Resource, ResourceRequest> reqMap = remoteRequests.get(resourceName);
+ if (reqMap == null) {
+ // as we modify the resource requests by filtering out blacklisted hosts
+ // when they are added, this value may be null when being
+ // decremented
+ LOG.debug("Not decrementing resource as " + resourceName
+ + " is not present in request table");
+ return;
+ }
ResourceRequest remoteRequest = reqMap.get(capability);
LOG.info("BEFORE decResourceRequest:" + " applicationId=" + applicationId.getId()
@@ -267,4 +310,23 @@ public abstract class RMContainerRequest
release.add(containerId);
}
+ protected boolean isNodeBlacklisted(String hostname) {
+ if (!nodeBlacklistingEnabled) {
+ return false;
+ }
+ return blacklistedNodes.contains(hostname);
+ }
+
+ protected ContainerRequest getFilteredContainerRequest(ContainerRequest orig) {
+ ArrayList<String> newHosts = new ArrayList<String>();
+ for (String host : orig.hosts) {
+ if (!isNodeBlacklisted(host)) {
+ newHosts.add(host);
+ }
+ }
+ String[] hosts = newHosts.toArray(new String[newHosts.size()]);
+ ContainerRequest newReq = new ContainerRequest(orig.attemptID, orig.capability,
+ hosts, orig.racks, orig.priority);
+ return newReq;
+ }
}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1186529&r1=1186528&r2=1186529&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Wed Oct 19 22:02:21 2011
@@ -34,6 +34,7 @@ import junit.framework.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.v2.api.records.JobId;
import org.apache.hadoop.mapreduce.v2.api.records.JobReport;
import org.apache.hadoop.mapreduce.v2.api.records.JobState;
@@ -44,6 +45,7 @@ import org.apache.hadoop.mapreduce.v2.ap
import org.apache.hadoop.mapreduce.v2.app.job.Job;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.impl.JobImpl;
+import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent;
import org.apache.hadoop.mapreduce.v2.app.rm.RMContainerAllocator;
import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils;
@@ -478,6 +480,105 @@ public class TestRMContainerAllocator {
Assert.assertEquals(100.0f, app.getProgress(), 0.0);
}
+ @Test
+ public void testBlackListedNodes() throws Exception {
+
+ LOG.info("Running testBlackListedNodes");
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING,
+ 0, 0, 0, 0, 0, 0, "jobfile"));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+ MockNM nodeManager2 = rm.registerNode("h2:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ // create the container request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ // send 1 more request with different resource req
+ ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+ new String[] { "h2" });
+ allocator.sendRequest(event2);
+
+ // send another request with different resource and priority
+ ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+ new String[] { "h3" });
+ allocator.sendRequest(event3);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // Send events to blacklist nodes h1 and h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+ allocator.sendFailure(f1);
+ ContainerFailedEvent f2 = createFailEvent(jobId, 1, "h2", false);
+ allocator.sendFailure(f2);
+
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ nodeManager2.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // mark h1/h2 as bad nodes
+ nodeManager1.nodeHeartbeat(false);
+ nodeManager2.nodeHeartbeat(false);
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ assigned = allocator.schedule();
+ dispatcher.await();
+
+ Assert.assertTrue("No of assignments must be 3", assigned.size() == 3);
+
+ // validate that all containers are assigned to h3
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ Assert.assertTrue("Assigned container host not correct", "h3".equals(assig
+ .getContainer().getNodeId().getHost()));
+ }
+ }
+
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {
@@ -534,6 +635,19 @@ public class TestRMContainerAllocator {
new String[] { NetworkTopology.DEFAULT_RACK });
}
+ private ContainerFailedEvent createFailEvent(JobId jobId, int taskAttemptId,
+ String host, boolean reduce) {
+ TaskId taskId;
+ if (reduce) {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.REDUCE);
+ } else {
+ taskId = MRBuilderUtils.newTaskId(jobId, 0, TaskType.MAP);
+ }
+ TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId(taskId,
+ taskAttemptId);
+ return new ContainerFailedEvent(attemptId, host);
+ }
+
private void checkAssignments(ContainerRequestEvent[] requests,
List<TaskAttemptContainerAssignedEvent> assignments,
boolean checkHostMatch) {
@@ -653,6 +767,10 @@ public class TestRMContainerAllocator {
}
}
+ public void sendFailure(ContainerFailedEvent f) {
+ super.handle(f);
+ }
+
// API to be used by tests
public List<TaskAttemptContainerAssignedEvent> schedule() {
// run the scheduler
@@ -672,6 +790,7 @@ public class TestRMContainerAllocator {
protected void startAllocatorThread() {
// override to NOT start thread
}
+
}
public static void main(String[] args) throws Exception {
@@ -681,5 +800,7 @@ public class TestRMContainerAllocator {
t.testMapReduceScheduling();
t.testReportedAppProgress();
t.testReportedAppProgressWithOnlyMaps();
+ t.testBlackListedNodes();
}
+
}