You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ac...@apache.org on 2011/05/16 20:32:14 UTC

svn commit: r1103823 - in /hadoop/mapreduce/branches/MR-279: ./ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/ha...

Author: acmurthy
Date: Mon May 16 18:32:14 2011
New Revision: 1103823

URL: http://svn.apache.org/viewvc?rev=1103823&view=rev
Log:
Fix CapacityScheduler to release unused reservations on application completion.

Modified:
    hadoop/mapreduce/branches/MR-279/CHANGES.txt
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java

Modified: hadoop/mapreduce/branches/MR-279/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/CHANGES.txt?rev=1103823&r1=1103822&r2=1103823&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/MR-279/CHANGES.txt Mon May 16 18:32:14 2011
@@ -3,6 +3,9 @@ Hadoop MapReduce Change Log
 Trunk (unreleased changes)
 
   MAPREDUCE-279
+    
+    Fix CapacityScheduler to release unused reservations on application
+    completion. (acmurthy)
 
     Launching bin/yarn and bin/mapred only *once* in AM for constructing
     classpaths to avoid multiple forks and huge vmem usage by AM. (vinodkv)

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java?rev=1103823&r1=1103822&r2=1103823&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Application.java Mon May 16 18:32:14 2011
@@ -504,4 +504,8 @@ public class Application {
       }
     }
   }
+
+  public Map<Priority, Set<NodeInfo>> getAllReservations() {
+    return new HashMap<Priority, Set<NodeInfo>>(reservedContainers);
+  }
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java?rev=1103823&r1=1103822&r2=1103823&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java Mon May 16 18:32:14 2011
@@ -24,6 +24,7 @@ import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 import java.util.concurrent.ConcurrentHashMap;
 
@@ -54,6 +55,7 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.ClusterTracker;
 import org.apache.hadoop.yarn.server.resourcemanager.resourcetracker.NodeInfo;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeManagerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.security.ContainerTokenSecretManager;
 import org.apache.hadoop.yarn.util.BuilderUtils;
@@ -271,12 +273,19 @@ implements ResourceScheduler, CapacitySc
       LOG.info("Unknown application " + applicationId + " has completed!");
       return;
     }
+    
     /*
      * release all the containes and make sure we clean up the pending 
      * requests for this application
      */
     processReleasedContainers(application, application.getCurrentContainers());
     application.clearRequests();
+    
+    /*
+     * release all reserved containers
+     */
+    releaseReservedContainers(application);
+    
     /** The application can be retried. So only remove it from scheduler data
      * structures if the finishapplication flag is set.
      */
@@ -460,6 +469,26 @@ implements ResourceScheduler, CapacitySc
     processCompletedContainers(unusedContainers);
   }
 
+  private synchronized void releaseReservedContainers(Application application) {
+    LOG.info("Releasing reservations for completed application: " + 
+        application.getApplicationId());
+    Queue queue = queues.get(application.getQueue().getQueueName());
+    Map<Priority, Set<NodeInfo>> reservations = application.getAllReservations();
+    for (Map.Entry<Priority, Set<NodeInfo>> e : reservations.entrySet()) {
+      Priority priority = e.getKey();
+      Set<NodeInfo> reservedNodes = e.getValue();
+      for (NodeInfo node : reservedNodes) {
+        Resource allocatedResource = 
+          application.getResourceRequest(priority, NodeManagerImpl.ANY).getCapability();
+    
+        application.unreserveResource(node, priority);
+        node.unreserveResource(application, priority);
+        
+        queue.completedContainer(clusterResource, null, allocatedResource, application);
+      }
+    }
+  }
+  
   private synchronized Application getApplication(ApplicationId applicationId) {
     return applications.get(applicationId);
   }