You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ss...@apache.org on 2013/02/07 08:08:30 UTC

svn commit: r1443324 - in /hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/ hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/tes...

Author: sseth
Date: Thu Feb  7 07:08:30 2013
New Revision: 1443324

URL: http://svn.apache.org/viewvc?rev=1443324&view=rev
Log:
merge MAPREDUCE-4671 from trunk. AM does not tell the RM about container requests which are no longer needed. Contributed by Bikas Saha.

Modified:
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
    hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1443324&r1=1443323&r2=1443324&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Thu Feb  7 07:08:30 2013
@@ -12,6 +12,9 @@ Release 2.0.4-beta - UNRELEASED
 
   BUG FIXES
 
+    MAPREDUCE-4671. AM does not tell the RM about container requests which are
+    no longer needed. (Bikas Saha via sseth)
+
 Release 2.0.3-alpha - 2013-02-06 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java?rev=1443324&r1=1443323&r2=1443324&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerRequestor.java Thu Feb  7 07:08:30 2013
@@ -72,7 +72,10 @@ public abstract class RMContainerRequest
   remoteRequestsTable =
       new TreeMap<Priority, Map<String, Map<Resource, ResourceRequest>>>();
 
-  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>();
+  // use custom comparator to make sure ResourceRequest objects differing only in 
+  // numContainers dont end up as duplicates
+  private final Set<ResourceRequest> ask = new TreeSet<ResourceRequest>(
+      new org.apache.hadoop.yarn.util.BuilderUtils.ResourceRequestComparator());
   private final Set<ContainerId> release = new TreeSet<ContainerId>(); 
 
   private boolean nodeBlacklistingEnabled;
@@ -235,7 +238,7 @@ public abstract class RMContainerRequest
               ResourceRequest zeroedRequest = BuilderUtils.newResourceRequest(req);
               zeroedRequest.setNumContainers(0);
               // to be sent to RM on next heartbeat
-              ask.add(zeroedRequest);
+              addResourceRequestToAsk(zeroedRequest);
             }
           }
           // if all requests were still in ask queue
@@ -320,7 +323,7 @@ public abstract class RMContainerRequest
     remoteRequest.setNumContainers(remoteRequest.getNumContainers() + 1);
 
     // Note this down for next interaction with ResourceManager
-    ask.add(remoteRequest);
+    addResourceRequestToAsk(remoteRequest);
     if (LOG.isDebugEnabled()) {
       LOG.debug("addResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
@@ -353,7 +356,12 @@ public abstract class RMContainerRequest
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
 
-    remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    if(remoteRequest.getNumContainers() > 0) {
+      // based on blacklisting comments above we can end up decrementing more 
+      // than requested. so guard for that.
+      remoteRequest.setNumContainers(remoteRequest.getNumContainers() -1);
+    }
+    
     if (remoteRequest.getNumContainers() == 0) {
       reqMap.remove(capability);
       if (reqMap.size() == 0) {
@@ -362,13 +370,12 @@ public abstract class RMContainerRequest
       if (remoteRequests.size() == 0) {
         remoteRequestsTable.remove(priority);
       }
-      //remove from ask if it may have
-      ask.remove(remoteRequest);
-    } else {
-      ask.add(remoteRequest);//this will override the request if ask doesn't
-      //already have it.
     }
 
+    // send the updated resource request to RM
+    // send 0 container count requests also to cancel previous requests
+    addResourceRequestToAsk(remoteRequest);
+
     if (LOG.isDebugEnabled()) {
       LOG.info("AFTER decResourceRequest:" + " applicationId="
           + applicationId.getId() + " priority=" + priority.getPriority()
@@ -376,6 +383,16 @@ public abstract class RMContainerRequest
           + remoteRequest.getNumContainers() + " #asks=" + ask.size());
     }
   }
+  
+  private void addResourceRequestToAsk(ResourceRequest remoteRequest) {
+    // because objects inside the resource map can be deleted ask can end up 
+    // containing an object that matches new resource object but with different
+    // numContainers. So exisintg values must be replaced explicitly
+    if(ask.contains(remoteRequest)) {
+      ask.remove(remoteRequest);
+    }
+    ask.add(remoteRequest);    
+  }
 
   protected void release(ContainerId containerId) {
     release.add(containerId);

Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java?rev=1443324&r1=1443323&r2=1443324&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java Thu Feb  7 07:08:30 2013
@@ -167,6 +167,7 @@ public class TestRMContainerAllocator {
     List<TaskAttemptContainerAssignedEvent> assigned = allocator.schedule();
     dispatcher.await();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
+    Assert.assertEquals(4, rm.getMyFifoScheduler().lastAsk.size());
 
     // send another request with different resource and priority
     ContainerRequestEvent event3 = createReq(jobId, 3, 1024,
@@ -178,7 +179,8 @@ public class TestRMContainerAllocator {
     assigned = allocator.schedule();
     dispatcher.await();
     Assert.assertEquals("No of assignments must be 0", 0, assigned.size());
-
+    Assert.assertEquals(3, rm.getMyFifoScheduler().lastAsk.size());
+    
     // update resources in scheduler
     nodeManager1.nodeHeartbeat(true); // Node heartbeat
     nodeManager2.nodeHeartbeat(true); // Node heartbeat
@@ -187,8 +189,14 @@ public class TestRMContainerAllocator {
 
     assigned = allocator.schedule();
     dispatcher.await();
+    Assert.assertEquals(0, rm.getMyFifoScheduler().lastAsk.size());
     checkAssignments(new ContainerRequestEvent[] { event1, event2, event3 },
         assigned, false);
+    
+    // check that the assigned container requests are cancelled
+    assigned = allocator.schedule();
+    dispatcher.await();
+    Assert.assertEquals(5, rm.getMyFifoScheduler().lastAsk.size());    
   }
   
   @Test 
@@ -422,7 +430,7 @@ public class TestRMContainerAllocator {
   }
 
   private static class MyResourceManager extends MockRM {
-
+    
     public MyResourceManager(Configuration conf) {
       super(conf);
     }
@@ -446,6 +454,10 @@ public class TestRMContainerAllocator {
     protected ResourceScheduler createScheduler() {
       return new MyFifoScheduler(this.getRMContext());
     }
+    
+    MyFifoScheduler getMyFifoScheduler() {
+      return (MyFifoScheduler) scheduler;
+    }
   }
 
   @Test
@@ -1194,7 +1206,9 @@ public class TestRMContainerAllocator {
         assert (false);
       }
     }
-
+    
+    List<ResourceRequest> lastAsk = null;
+    
     // override this to copy the objects otherwise FifoScheduler updates the
     // numContainers in same objects as kept by RMContainerAllocator
     @Override
@@ -1208,6 +1222,7 @@ public class TestRMContainerAllocator {
             .getNumContainers());
         askCopy.add(reqCopy);
       }
+      lastAsk = ask;
       return super.allocate(applicationAttemptId, askCopy, release);
     }
   }