You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2011/12/02 23:22:01 UTC

svn commit: r1209740 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/...

Author: sseth
Date: Fri Dec  2 22:22:01 2011
New Revision: 1209740

URL: http://svn.apache.org/viewvc?rev=1209740&view=rev
Log:
mrege MAPREDUCE-3460 from trunk

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Fri Dec  2 22:22:01 2011
@@ -175,6 +175,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3443. JobClient and Job should function in the context of the
     UGI which created them. (Mahadev Konar via sseth)
 
+    MAPREDUCE-3460. MR AM can hang if containers are allocated on a node
+    blacklisted by the AM. (Hitesh Shah and Robert Joseph Evans via sseth)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java Fri Dec  2 22:22:01 2011
@@ -87,7 +87,7 @@ public class RMContainerAllocator extend
   }
   
   /*
-  Vocabulory Used: 
+  Vocabulary Used: 
   pending -> requests which are NOT yet sent to RM
   scheduled -> requests which are sent to RM but not yet assigned
   assigned -> requests which are assigned to a container
@@ -565,6 +565,7 @@ public class RMContainerAllocator extend
       if (event.getEarlierAttemptFailed()) {
         earlierFailedMaps.add(event.getAttemptID());
         request = new ContainerRequest(event, PRIORITY_FAST_FAIL_MAP);
+        LOG.info("Added "+event.getAttemptID()+" to list of failed maps");
       } else {
         for (String host : event.getHosts()) {
           LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@@ -603,7 +604,9 @@ public class RMContainerAllocator extend
       containersAllocated += allocatedContainers.size();
       while (it.hasNext()) {
         Container allocated = it.next();
-        LOG.info("Assigning container " + allocated);
+        LOG.info("Assigning container " + allocated.getId() +
+            " with priority " + allocated.getPriority() +
+            " to NM " + allocated.getNodeId());
         
         // check if allocated container meets memory requirements 
         // and whether we have any scheduled tasks that need 
@@ -645,7 +648,8 @@ public class RMContainerAllocator extend
             // we need to request for a new container 
             // and release the current one
             LOG.info("Got allocated container on a blacklisted "
-                + " host. Releasing container " + allocated);
+                + " host "+allocated.getNodeId().getHost()
+                +". Releasing container " + allocated);
 
             // find the request matching this allocated container 
             // and replace it with a new one 
@@ -727,10 +731,20 @@ public class RMContainerAllocator extend
     }
     
     private ContainerRequest getContainerReqToReplace(Container allocated) {
+      LOG.info("Finding containerReq for allocated container: " + allocated);
       Priority priority = allocated.getPriority();
       ContainerRequest toBeReplaced = null;
-      if (PRIORITY_FAST_FAIL_MAP.equals(priority) 
-          || PRIORITY_MAP.equals(priority)) {
+      if (PRIORITY_FAST_FAIL_MAP.equals(priority)) {
+        LOG.info("Replacing FAST_FAIL_MAP container " + allocated.getId());
+        Iterator<TaskAttemptId> iter = earlierFailedMaps.iterator();
+        while (toBeReplaced == null && iter.hasNext()) {
+          toBeReplaced = maps.get(iter.next());
+        }
+        LOG.info("Found replacement: " + toBeReplaced);
+        return toBeReplaced;
+      }
+      else if (PRIORITY_MAP.equals(priority)) {
+        LOG.info("Replacing MAP container " + allocated.getId());
         // allocated container was for a map
         String host = allocated.getNodeId().getHost();
         LinkedList<TaskAttemptId> list = mapsHostMapping.get(host);
@@ -749,6 +763,7 @@ public class RMContainerAllocator extend
         TaskAttemptId tId = reduces.keySet().iterator().next();
         toBeReplaced = reduces.remove(tId);    
       }
+      LOG.info("Found replacement: " + toBeReplaced);
       return toBeReplaced;
     }
     
@@ -758,7 +773,7 @@ public class RMContainerAllocator extend
       //try to assign to earlierFailedMaps if present
       ContainerRequest assigned = null;
       while (assigned == null && earlierFailedMaps.size() > 0) {
-        TaskAttemptId tId = earlierFailedMaps.removeFirst();
+        TaskAttemptId tId = earlierFailedMaps.removeFirst();      
         if (maps.containsKey(tId)) {
           assigned = maps.remove(tId);
           JobCounterUpdateEvent jce =

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Fri Dec  2 22:22:01 2011
@@ -105,6 +105,13 @@ public abstract class RMContainerRequest
       this.priority = priority;
     }
     
+    public String toString() {
+      StringBuilder sb = new StringBuilder();
+      sb.append("AttemptId[").append(attemptID).append("]");
+      sb.append("Capability[").append(capability).append("]");
+      sb.append("Priority[").append(priority).append("]");
+      return sb.toString();
+    }
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Fri Dec  2 22:22:01 2011
@@ -580,6 +580,135 @@ public class TestRMContainerAllocator {
     }
   }
   
+  @Test
+  public void testBlackListedNodesWithSchedulingToThatNode() throws Exception {
+    LOG.info("Running testBlackListedNodesWithSchedulingToThatNode");
+
+    Configuration conf = new Configuration();
+    conf.setBoolean(MRJobConfig.MR_AM_JOB_NODE_BLACKLISTING_ENABLE, true);
+    conf.setInt(MRJobConfig.MAX_TASK_FAILURES_PER_TRACKER, 1);
+    
+    MyResourceManager rm = new MyResourceManager(conf);
+    rm.start();
+    DrainDispatcher dispatcher = (DrainDispatcher) rm.getRMContext()
+        .getDispatcher();
+
+    // Submit the application
+    RMApp app = rm.submitApp(1024);
+    dispatcher.await();
+
+    MockNM amNodeManager = rm.registerNode("amNM:1234", 2048);
+    amNodeManager.nodeHeartbeat(true);
+    dispatcher.await();
+
+    ApplicationAttemptId appAttemptId = app.getCurrentAppAttempt()
+        .getAppAttemptId();
+    rm.sendAMLaunched(appAttemptId);
+    dispatcher.await();
+    
+    JobId jobId = MRBuilderUtils.newJobId(appAttemptId.getApplicationId(), 0);
+    Job mockJob = mock(Job.class);
+    when(mockJob.getReport()).thenReturn(
+        MRBuilderUtils.newJobReport(jobId, "job", "user", JobState.RUNNING, 0,
+            0, 0, 0, 0, 0, 0, "jobfile", null));
+    MyContainerAllocator allocator = new MyContainerAllocator(rm, conf,
+        appAttemptId, mockJob);
+
+    // add resources to scheduler
+    MockNM nodeManager1 = rm.registerNode("h1:1234", 10240);
+    MockNM nodeManager3 = rm.registerNode("h3:1234", 10240);
+    dispatcher.await();
+
+    LOG.info("Requesting 1 Containers _1 on H1");
+    // create the container request
+    ContainerRequestEvent event1 = createReq(jobId, 1, 1024,
+        new String[] { "h1" });
+    allocator.sendRequest(event1);
+
+    LOG.info("RM Heartbeat (to send the container requests)");
+    // this tells the scheduler about the requests
+    // as nodes are not added, no allocations
+    List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    LOG.info("h1 Heartbeat (To actually schedule the containers)");
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+    
+    LOG.info("RM Heartbeat (To process the scheduled containers)");
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 1", 1, assigned.size());    
+    
+    LOG.info("Failing container _1 on H1 (should blacklist the node)");
+    // Send events to blacklist nodes h1 and h2
+    ContainerFailedEvent f1 = createFailEvent(jobId, 1, "h1", false);
+    allocator.sendFailure(f1);
+
+    //At this stage, a request should be created for a fast fail map
+    //Create a FAST_FAIL request for a previously failed map.
+    ContainerRequestEvent event1f = createReq(jobId, 1, 1024,
+        new String[] { "h1" }, true, false);
+    allocator.sendRequest(event1f);
+
+    //Update the Scheduler with the new requests.
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+
+    // send another request with different resource and priority
+    ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
+        new String[] { "h1", "h3" });
+    allocator.sendRequest(event3);
+    
+    //Allocator is aware of prio:5 container, and prio:20 (h1+h3) container.
+    //RM is only aware of the prio:5 container
+    
+    LOG.info("h1 Heartbeat (To actually schedule the containers)");
+    // update resources in scheduler
+    nodeManager1.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+    
+    LOG.info("RM Heartbeat (To process the scheduled containers)");
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());    
+    
+    //RMContainerAllocator gets assigned a p:5 on a blacklisted node.
+
+    //Send a release for the p:5 container + another request.
+    LOG.info("RM Heartbeat (To process the re-scheduled containers)");
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+    
+    //Hearbeat from H3 to schedule on this host.
+    LOG.info("h3 Heartbeat (To re-schedule the containers)");
+    nodeManager3.nodeHeartbeat(true); // Node heartbeat
+    dispatcher.await();
+    
+    LOG.info("RM Heartbeat (To process the re-scheduled containers for H3)");
+    assigned = allocator.schedule();
+    dispatcher.await();
+     
+    // For debugging
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      LOG.info(assig.getTaskAttemptID() +
+          " assgined to " + assig.getContainer().getId() +
+          " with priority " + assig.getContainer().getPriority());
+    }
+    
+    Assert.assertEquals("No of assignments must be 2", 2, assigned.size());
+    
+    // validate that all containers are assigned to h3
+    for (TaskAttemptContainerAssignedEvent assig : assigned) {
+      Assert.assertEquals("Assigned container " + assig.getContainer().getId()
+          + " host not correct", "h3", assig.getContainer().getNodeId().getHost());
+    }
+  }
+  
   private static class MyFifoScheduler extends FifoScheduler {
 
     public MyFifoScheduler(RMContext rmContext) {

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java?rev=1209740&r1=1209739&r2=1209740&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ContainerPBImpl.java Fri Dec  2 22:22:01 2011
@@ -340,6 +340,21 @@ public class ContainerPBImpl extends Pro
     return ((ContainerStatusPBImpl)t).getProto();
   }
 
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append("Container: [");
+    sb.append("ContainerId: ").append(getId()).append(", ");
+    sb.append("NodeId: ").append(getNodeId()).append(", ");
+    sb.append("NodeHttpAddress: ").append(getNodeHttpAddress()).append(", ");
+    sb.append("Resource: ").append(getResource()).append(", ");
+    sb.append("Priority: ").append(getPriority()).append(", ");
+    sb.append("State: ").append(getState()).append(", ");
+    sb.append("Token: ").append(getContainerToken()).append(", ");
+    sb.append("Status: ").append(getContainerStatus());
+    sb.append("]");
+    return sb.toString();
+  }
+
   //TODO Comparator
   @Override
   public int compareTo(Container other) {