You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2013/01/31 01:29:37 UTC
svn commit: r1440752 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/
hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/tes...
Author: vinodkv
Date: Thu Jan 31 00:29:36 2013
New Revision: 1440752
URL: http://svn.apache.org/viewvc?rev=1440752&view=rev
Log:
MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of containers to get maximum locality. Contributed by Bikas Saha.
svn merge --ignore-ancestry -c 1440749 ../../trunk/
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1440752&r1=1440751&r2=1440752&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Jan 31 00:29:36 2013
@@ -63,6 +63,9 @@ Release 2.0.3-alpha - Unreleased
OPTIMIZATIONS
+ MAPREDUCE-4893. Fixed MR ApplicationMaster to do optimal assignment of
+ containers to get maximum locality. (Bikas Saha via vinodkv)
+
BUG FIXES
MAPREDUCE-4272. SortedRanges.Range#compareTo is not spec compliant.
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1440752&r1=1440751&r2=1440752&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Thu Jan 31 00:29:36 2013
@@ -747,7 +747,7 @@ public class RMContainerAllocator extend
addContainerReq(req);
}
- @SuppressWarnings("unchecked")
+ // this method will change the list of allocatedContainers.
private void assign(List<Container> allocatedContainers) {
Iterator<Container> it = allocatedContainers.iterator();
LOG.info("Got allocated containers " + allocatedContainers.size());
@@ -788,84 +788,97 @@ public class RMContainerAllocator extend
+ reduces.isEmpty());
isAssignable = false;
}
- }
+ } else {
+ LOG.warn("Container allocated at unwanted priority: " + priority +
+ ". Returning to RM...");
+ isAssignable = false;
+ }
- boolean blackListed = false;
- ContainerRequest assigned = null;
+ if(!isAssignable) {
+ // release container if we could not assign it
+ containerNotAssigned(allocated);
+ it.remove();
+ continue;
+ }
- if (isAssignable) {
- // do not assign if allocated container is on a
- // blacklisted host
- String allocatedHost = allocated.getNodeId().getHost();
- blackListed = isNodeBlacklisted(allocatedHost);
- if (blackListed) {
- // we need to request for a new container
- // and release the current one
- LOG.info("Got allocated container on a blacklisted "
- + " host "+allocatedHost
- +". Releasing container " + allocated);
-
- // find the request matching this allocated container
- // and replace it with a new one
- ContainerRequest toBeReplacedReq =
- getContainerReqToReplace(allocated);
- if (toBeReplacedReq != null) {
- LOG.info("Placing a new container request for task attempt "
- + toBeReplacedReq.attemptID);
- ContainerRequest newReq =
- getFilteredContainerRequest(toBeReplacedReq);
- decContainerReq(toBeReplacedReq);
- if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
- TaskType.MAP) {
- maps.put(newReq.attemptID, newReq);
- }
- else {
- reduces.put(newReq.attemptID, newReq);
- }
- addContainerReq(newReq);
+ // do not assign if allocated container is on a
+ // blacklisted host
+ String allocatedHost = allocated.getNodeId().getHost();
+ if (isNodeBlacklisted(allocatedHost)) {
+ // we need to request for a new container
+ // and release the current one
+ LOG.info("Got allocated container on a blacklisted "
+ + " host "+allocatedHost
+ +". Releasing container " + allocated);
+
+ // find the request matching this allocated container
+ // and replace it with a new one
+ ContainerRequest toBeReplacedReq =
+ getContainerReqToReplace(allocated);
+ if (toBeReplacedReq != null) {
+ LOG.info("Placing a new container request for task attempt "
+ + toBeReplacedReq.attemptID);
+ ContainerRequest newReq =
+ getFilteredContainerRequest(toBeReplacedReq);
+ decContainerReq(toBeReplacedReq);
+ if (toBeReplacedReq.attemptID.getTaskId().getTaskType() ==
+ TaskType.MAP) {
+ maps.put(newReq.attemptID, newReq);
}
else {
- LOG.info("Could not map allocated container to a valid request."
- + " Releasing allocated container " + allocated);
+ reduces.put(newReq.attemptID, newReq);
}
+ addContainerReq(newReq);
}
else {
- assigned = assign(allocated);
- if (assigned != null) {
- // Update resource requests
- decContainerReq(assigned);
-
- // send the container-assigned event to task attempt
- eventHandler.handle(new TaskAttemptContainerAssignedEvent(
- assigned.attemptID, allocated, applicationACLs));
-
- assignedRequests.add(allocated, assigned.attemptID);
-
- if (LOG.isDebugEnabled()) {
- LOG.info("Assigned container (" + allocated + ") "
- + " to task " + assigned.attemptID + " on node "
- + allocated.getNodeId().toString());
- }
- }
- else {
- //not assigned to any request, release the container
- LOG.info("Releasing unassigned and invalid container "
- + allocated + ". RM has gone crazy, someone go look!"
- + " Hey RM, if you are so rich, go donate to non-profits!");
- }
+ LOG.info("Could not map allocated container to a valid request."
+ + " Releasing allocated container " + allocated);
}
+
+ // release container if we could not assign it
+ containerNotAssigned(allocated);
+ it.remove();
+ continue;
}
-
- // release container if it was blacklisted
- // or if we could not assign it
- if (blackListed || assigned == null) {
- containersReleased++;
- release(allocated.getId());
- }
+ }
+
+ assignContainers(allocatedContainers);
+
+ // release container if we could not assign it
+ it = allocatedContainers.iterator();
+ while (it.hasNext()) {
+ Container allocated = it.next();
+ LOG.info("Releasing unassigned and invalid container "
+ + allocated + ". RM may have assignment issues");
+ containerNotAssigned(allocated);
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private void containerAssigned(Container allocated,
+ ContainerRequest assigned) {
+ // Update resource requests
+ decContainerReq(assigned);
+
+ // send the container-assigned event to task attempt
+ eventHandler.handle(new TaskAttemptContainerAssignedEvent(
+ assigned.attemptID, allocated, applicationACLs));
+
+ assignedRequests.add(allocated, assigned.attemptID);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.info("Assigned container (" + allocated + ") "
+ + " to task " + assigned.attemptID + " on node "
+ + allocated.getNodeId().toString());
}
}
- private ContainerRequest assign(Container allocated) {
+ private void containerNotAssigned(Container allocated) {
+ containersReleased++;
+ release(allocated.getId());
+ }
+
+ private ContainerRequest assignWithoutLocality(Container allocated) {
ContainerRequest assigned = null;
Priority priority = allocated.getPriority();
@@ -877,18 +890,24 @@ public class RMContainerAllocator extend
LOG.debug("Assigning container " + allocated + " to reduce");
}
assigned = assignToReduce(allocated);
- } else if (PRIORITY_MAP.equals(priority)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Assigning container " + allocated + " to map");
- }
- assigned = assignToMap(allocated);
- } else {
- LOG.warn("Container allocated at unwanted priority: " + priority +
- ". Returning to RM...");
}
return assigned;
}
+
+ private void assignContainers(List<Container> allocatedContainers) {
+ Iterator<Container> it = allocatedContainers.iterator();
+ while (it.hasNext()) {
+ Container allocated = it.next();
+ ContainerRequest assigned = assignWithoutLocality(allocated);
+ if (assigned != null) {
+ containerAssigned(allocated, assigned);
+ it.remove();
+ }
+ }
+
+ assignMapsWithLocality(allocatedContainers);
+ }
private ContainerRequest getContainerReqToReplace(Container allocated) {
LOG.info("Finding containerReq for allocated container: " + allocated);
@@ -959,11 +978,15 @@ public class RMContainerAllocator extend
}
@SuppressWarnings("unchecked")
- private ContainerRequest assignToMap(Container allocated) {
- //try to assign to maps if present
- //first by host, then by rack, followed by *
- ContainerRequest assigned = null;
- while (assigned == null && maps.size() > 0) {
+ private void assignMapsWithLocality(List<Container> allocatedContainers) {
+ // try to assign to all nodes first to match node local
+ Iterator<Container> it = allocatedContainers.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = it.next();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ // "if (maps.containsKey(tId))" below should be almost always true.
+ // hence this while loop would almost always have O(1) complexity
String host = allocated.getNodeId().getHost();
LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
while (list != null && list.size() > 0) {
@@ -972,7 +995,9 @@ public class RMContainerAllocator extend
}
TaskAttemptId tId = list.removeFirst();
if (maps.containsKey(tId)) {
- assigned = maps.remove(tId);
+ ContainerRequest assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
jce.addCounterUpdate(JobCounter.DATA_LOCAL_MAPS, 1);
@@ -984,39 +1009,56 @@ public class RMContainerAllocator extend
break;
}
}
- if (assigned == null) {
- String rack = RackResolver.resolve(host).getNetworkLocation();
- list = mapsRackMapping.get(rack);
- while (list != null && list.size() > 0) {
- TaskAttemptId tId = list.removeFirst();
- if (maps.containsKey(tId)) {
- assigned = maps.remove(tId);
- JobCounterUpdateEvent jce =
- new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
- jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
- eventHandler.handle(jce);
- rackLocalAssigned++;
- if (LOG.isDebugEnabled()) {
- LOG.debug("Assigned based on rack match " + rack);
- }
- break;
- }
- }
- if (assigned == null && maps.size() > 0) {
- TaskAttemptId tId = maps.keySet().iterator().next();
- assigned = maps.remove(tId);
+ }
+
+ // try to match all rack local
+ it = allocatedContainers.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = it.next();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ // "if (maps.containsKey(tId))" below should be almost always true.
+ // hence this while loop would almost always have O(1) complexity
+ String host = allocated.getNodeId().getHost();
+ String rack = RackResolver.resolve(host).getNetworkLocation();
+ LinkedList<TaskAttemptId> list = mapsRackMapping.get(rack);
+ while (list != null && list.size() > 0) {
+ TaskAttemptId tId = list.removeFirst();
+ if (maps.containsKey(tId)) {
+ ContainerRequest assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
JobCounterUpdateEvent jce =
new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
- jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ jce.addCounterUpdate(JobCounter.RACK_LOCAL_MAPS, 1);
eventHandler.handle(jce);
+ rackLocalAssigned++;
if (LOG.isDebugEnabled()) {
- LOG.debug("Assigned based on * match");
+ LOG.debug("Assigned based on rack match " + rack);
}
break;
}
}
}
- return assigned;
+
+ // assign remaining
+ it = allocatedContainers.iterator();
+ while(it.hasNext() && maps.size() > 0){
+ Container allocated = it.next();
+ Priority priority = allocated.getPriority();
+ assert PRIORITY_MAP.equals(priority);
+ TaskAttemptId tId = maps.keySet().iterator().next();
+ ContainerRequest assigned = maps.remove(tId);
+ containerAssigned(allocated, assigned);
+ it.remove();
+ JobCounterUpdateEvent jce =
+ new JobCounterUpdateEvent(assigned.attemptID.getTaskId().getJobId());
+ jce.addCounterUpdate(JobCounter.OTHER_LOCAL_MAPS, 1);
+ eventHandler.handle(jce);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Assigned based on * match");
+ }
+ }
}
}
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1440752&r1=1440751&r2=1440752&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Jan 31 00:29:36 2013
@@ -190,6 +190,92 @@ public class TestRMContainerAllocator {
checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
assigned, false);
}
+
+ @Test
+ public void testMapNodeLocality() throws Exception {
+ // test checks that ordering of allocated containers list from the RM does
+ // not affect the map->container assignment done by the AM. If there is a
+ // node local container available for a map then it should be assigned to
+ // that container and not a rack-local container that happened to be seen
+ // earlier in the allocated containers list from the RM.
+ // Regression test for MAPREDUCE-4893
+ LOG.info("Running testMapNodeLocality");
+
+ Configuration conf = new Configuration();
+ MyResourceManager rm = new MyResourceManager(conf);
+ rm.start();
+ DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+ .getDispatcher();
+
+ // Submit the application
+ RMApp app = rm.submitApp(1024);
+ dispatcher.await();
+
+ MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+ amNodeManager.nodeHeartbeat(true);
+ dispatcher.await();
+
+ ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+ .getAppAttemptId();
+ rm.sendAMLaunched(appAttemptId);
+ dispatcher.await();
+
+ JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+ Job mockJob = mock(Job.class);
+ when(mockJob.getReport()).thenReturn(
+ MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+ 0, 0, 0, 0, 0, 0, "jobfile", null, false, ""));
+ MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+ appAttemptId, mockJob);
+
+ // add resources to scheduler
+ MockNM nodeManager1 = rm.registerNode("h1:1234", 3072); // can assign 2 maps
+ rm.registerNode("h2:1234", 10240); // wont heartbeat on node local node
+ MockNM nodeManager3 = rm.registerNode("h3:1234", 1536); // assign 1 map
+ dispatcher.await();
+
+ // create the container requests for maps
+ ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event1);
+ ContainerRequestEvent event2 = createReq(jobId, 2, 1024,
+ new String[] { "h1" });
+ allocator.sendRequest(event2);
+ ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+ new String[] { "h2" });
+ allocator.sendRequest(event3);
+
+ // this tells the scheduler about the requests
+ // as nodes are not added, no allocations
+ List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+ dispatcher.await();
+ Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+ // update resources in scheduler
+ // Node heartbeat from rack-local first. This makes node h3 the first in the
+ // list of allocated containers but it should not be assigned to task1.
+ nodeManager3.nodeHeartbeat(true);
+ // Node heartbeat from node-local next. This allocates 2 node local
+ // containers for task1 and task2. These should be matched with those tasks.
+ nodeManager1.nodeHeartbeat(true);
+ dispatcher.await();
+
+ assigned = allocator.schedule();
+ dispatcher.await();
+ checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
+ assigned, false);
+ // remove the rack-local assignment that should have happened for task3
+ for(TaskAttemptContainerAssignedEvent event : assigned) {
+ if(event.getTaskAttemptID().equals(event3.getAttemptID())) {
+ assigned.remove(event);
+ Assert.assertTrue(
+ event.getContainer().getNodeId().getHost().equals("h3"));
+ break;
+ }
+ }
+ checkAssignments(new ContainerRequestEvent[] { event1, event2},
+ assigned, true);
+ }
@Test
public void testResource() throws Exception {
@@ -1202,7 +1288,7 @@ public class TestRMContainerAllocator {
if (checkHostMatch) {
Assert.assertTrue("Not assigned to requested host", Arrays.asList(
request.getHosts()).contains(
- assigned.getContainer().getNodeId().toString()));
+ assigned.getContainer().getNodeId().getHost()));
}
}