You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2011/12/02 23:22:01 UTC
svn commit: r1209740 - in
/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...
Author: sseth
Date: Fri Dec 2 22:22:01 2011
New Revision: 1209740
URL: http://svn.apache.org/viewvc?rev=1209740&view=rev
Log:
mrege MAPREDUCE-3460 from trunk
Modified:
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Dec 2 22:22:01 2011
@@ -175,6 +175,9 @@ Release 0.23.1 - Unreleased
MAPREDUCE-3443. JobClient and Job should function in the context of the
UGI which created them. (Mahadev Konar via sseth)
+ MAPREDUCE-3460. MR AM can hang if containers are allocated on a node
+ blacklisted by the AM. (Hitesh Shah and Robert Joseph Evans via sseth)
+
Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Dec 2 22:22:01 2011
@@ -87,7 +87,7 @@ public class RMContainerAllocator extend
}
/*
- Vocabulory Used:
+ Vocabulary Used:
pending -> requests which are NOT yet sent to RM
scheduled -> requests which are sent to RM but not yet assigned
assigned -> requests which are assigned to a container
@@ -565,6 +565,7 @@ public class RMContainerAllocator extend
if (event.getEarlierAttemptFailed()) {
earlierFailedMaps.add(event.getAttemptID());
request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+ LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
} else {
for (String host : event.getHosts()) {
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@@ -603,7 +604,9 @@ public class RMContainerAllocator extend
containersAllocated += allocatedContainers.size();
while (it.hasNext()) {
Container allocated = it.next();
- LOG.info("Assigning container " + allocated);
+ LOG.info("Assigning container " + allocated.getId() +
+ " with priority " + allocated.getPriority() +
+ " to NM " + allocated.getNodeId());
// check if allocated container meets memory requirements
// and whether we have any scheduled tasks that need
@@ -645,7 +648,8 @@ public class RMContainerAllocator extend
// we need to request for a new container
// and release the current one
LOG.info("Got allocated container on a blacklisted "
- + " host. Releasing container " + allocated);
+ + " host "+allocated.getNodeId().getHost()
+ +". Releasing container " + allocated);
// find the request matching this allocated container
// and replace it with a new one
@@ -727,10 +731,20 @@ public class RMContainerAllocator extend
}
private ContainerRequest getContainerReqToReplace(Container allocated) {
+ LOG.info("Finding containerReq for allocated container: " + allocated);
Priority priority = allocated.getPriority();
ContainerRequest toBeReplaced = null;
- if (PRIORITY_FAST_FAIL_MAP.equals(priority)
- || PRIORITY_MAP.equals(priority)) {
+ if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+ LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId());
+ Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator();
+ while (toBeReplaced == null && iter.hasNext()) {
+ toBeReplaced = maps.get(iter.next());
+ }
+ LOG.info("Found replacement: " + toBeReplaced);
+ return toBeReplaced;
+ }
+ else if (PRIORITY_MAP.equals(priority)) {
+ LOG.info("Replacing MAP container " + allocated.getId());
// allocated container was for a map
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@@ -749,6 +763,7 @@ public class RMContainerAllocator extend
TaskAttemptId tId = reduces.keySet().iterator().next();
toBeReplaced = reduces.remove(tId);
}
+ LOG.info("Found replacement: " + toBeReplaced);
return toBeReplaced;
}
@@ -758,7 +773,7 @@ public class RMContainerAllocator extend
//try to assign to earlierFailedMaps if present
ContainerRequest assigned = null;
while (assigned == null && earlierFailedMaps.size() > 0) {
- TaskAttemptId tId = earlierFailedMaps.removeFirst();
+ TaskAttemptId tId = earlierFailedMaps.removeFirst();
if (maps.containsKey(tId)) {
assigned = maps.remove(tId);
JobCounterUpdateEvent jce =
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Dec 2 22:22:01 2011
@@ -105,6 +105,13 @@ public abstract class RMContainerRequest
this.priority = priority;
}
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("AttemptId[").append(attemptID).append("]");
+ sb.append("Capability[").append(capability).append("]");
+ sb.append("Priority[").append(priority).append("]");
+ return sb.toString();
+ }
}
@Override
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Dec 2 22:22:01 2011
@@ -580,6 +580,135 @@ public class TestRMContainerAllocator {
}
}
+ @Test
+ public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
+ LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
+
+ Configuration conf = new Configuration();
+ conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+ conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+ dispatcher.await();
+
+ LOG.info("Requesting 1 Containers _1 on H1");
+ // create the container request
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+
+ LOG.info("RM Heartbeat (to send the container requests)");
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ LOG.info("h1 Heartbeat (To actually schedule the containers)");
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ LOG.info("RM Heartbeat (To process the scheduled containers)");
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 1", 1, assigned.size());
+
+ LOG.info("Failing container _1 on H1 (should blacklist the node)");
+ // Send events to blacklist nodes h1 and h2
+ ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+ allocator.sendFailure(f1);
+
+ //At this stage, a request should be created for a fast fail map
+ //Create a FAST_FAIL request for a previously failed map.
+ ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
+ new String[] { "h1" }, true, false);
+ allocator.sendRequest(event1f);
+
+ //Update the Scheduler with the new requests.
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // send another request with different resource and priority
+ ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+ new String[] { "h1", "h3" });
+ allocator.sendRequest(event3);
+
+ //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
+ //RM is only aware of the prio:5 container
+
+ LOG.info("h1 Heartbeat (To actually schedule the containers)");
+ // update resources in scheduler
+ nodeManager1.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ LOG.info("RM Heartbeat (To process the scheduled containers)");
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
+
+ //Send a release for the p:5 container + another request.
+ LOG.info("RM Heartbeat (To process the re-scheduled containers)");
+ assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ //Hearbeat from H3 to schedule on this host.
+ LOG.info("h3 Heartbeat (To re-schedule the containers)");
+ nodeManager3.nodeHeartbeat(true); // Node heartbeat
+ dispatcher.await();
+
+ LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
+ assigned = allocator.schedule();
+ dispatcher.await();
+
+ // For debugging
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ LOG.info(assig.getTaskAttemptID() +
+ " assgined to " + assig.getContainer().getId() +
+ " with priority " + assig.getContainer().getPriority());
+ }
+
+ Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+
+ // validate that all containers are assigned to h3
+ for (TaskAttemptContainerAssignedEvent assig : assigned) {
+ Assert.assertEquals("Assigned container " + assig.getContainer().getId()
+ + " host not correct", "h3", assig.getContainer().getNodeId().getHost());
+ }
+ }
+
private static class MyFifoScheduler extends FifoScheduler {
public MyFifoScheduler(RMContext rmContext) {
Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java Fri Dec 2 22:22:01 2011
@@ -340,6 +340,21 @@ public class ContainerPBImpl extends Pro
return ((ContainerStatusPBImpl)t).getProto();
}
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Container: [");
+ sb.append("ContainerId: ").append(getId()).append(", ");
+ sb.append("NodeId: ").append(getNodeId()).append(", ");
+ sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", ");
+ sb.append("Resource: ").append(getResource()).append(", ");
+ sb.append("Priority: ").append(getPriority()).append(", ");
+ sb.append("State: ").append(getState()).append(", ");
+ sb.append("Token: ").append(getContainerToken()).append(", ");
+ sb.append("Status: ").append(getContainerStatus());
+ sb.append("]");
+ return sb.toString();
+ }
+
//TODO Comparator
@Override
public int compareTo(Container other) {