You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2012/02/02 01:42:19 UTC

svn commit: r1239423 - in /hadoop/common/branches/branch-0.23/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ hadoop-yarn/hadoop-yarn-s...

Author: vinodkv
Date: Thu Feb  2 00:42:18 2012
New Revision: 1239423

URL: http://svn.apache.org/viewvc?rev=1239423&view=rev
Log:
MAPREDUCE-3752. Modified application limits to include queue max-capacities besides the usual user limits. Contributed by Arun C Murthy.
svn merge --ignore-ancestry -c 1239422 ../../trunk/

Modified:
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/CHANGES.txt Thu Feb  2 00:42:18 2012
@@ -606,6 +606,9 @@ Release 0.23.1 - Unreleased
     MAPREDUCE-3640. Allow AMRecovery to work with partial JobHistory files.
     (Arun C Murthy via sseth)
 
+    MAPREDUCE-3752. Modified application limits to include queue max-capacities
+    besides the usual user limits. (Arun C Murthy via vinodkv)
+
 Release 0.23.0 - 2011-11-01 
 
   INCOMPATIBLE CHANGES

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/Resources.java Thu Feb  2 00:42:18 2012
@@ -111,4 +111,12 @@ public class Resources {
   public static boolean greaterThanOrEqual(Resource lhs, Resource rhs) {
     return lhs.getMemory() >= rhs.getMemory();
   }
+  
+  public static Resource min(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() < rhs.getMemory()) ? lhs : rhs;
+  }
+
+  public static Resource max(Resource lhs, Resource rhs) {
+    return (lhs.getMemory() > rhs.getMemory()) ? lhs : rhs;
+  }
 }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java Thu Feb  2 00:42:18 2012
@@ -162,6 +162,13 @@ public class AppSchedulingInfo {
 
       asks.put(hostName, request);
       if (updatePendingResources) {
+        
+        // Similarly, deactivate application?
+        if (request.getNumContainers() <= 0) {
+          LOG.info("checking for deactivate... ");
+          checkForDeactivation();
+        }
+        
         int lastRequestContainers = lastRequest != null ? lastRequest
             .getNumContainers() : 0;
         Resource lastRequestCapability = lastRequest != null ? lastRequest
@@ -308,19 +315,24 @@ public class AppSchedulingInfo {
     // Do we have any outstanding requests?
     // If there is nothing, we need to deactivate this application
     if (numOffSwitchContainers == 0) {
-      boolean deactivate = true;
-      for (Priority priority : getPriorities()) {
-        ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
-        if (request.getNumContainers() > 0) {
-          deactivate = false;
-          break;
-        }
-      }
-      if (deactivate) {
-        activeUsersManager.deactivateApplication(user, applicationId);
+      checkForDeactivation();
+    }
+  }
+  
+  synchronized private void checkForDeactivation() {
+    boolean deactivate = true;
+    for (Priority priority : getPriorities()) {
+      ResourceRequest request = getResourceRequest(priority, RMNodeImpl.ANY);
+      if (request.getNumContainers() > 0) {
+        deactivate = false;
+        break;
       }
     }
+    if (deactivate) {
+      activeUsersManager.deactivateApplication(user, applicationId);
+    }
   }
+  
   synchronized private void allocate(Container container) {
     // Update consumption and track allocations
     //TODO: fixme sharad

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Thu Feb  2 00:42:18 2012
@@ -751,15 +751,15 @@ public class LeafQueue implements CSQueu
             continue;
           }
 
-          // Compute & set headroom
-          // Note: We set the headroom with the highest priority request 
-          //       as the target. 
+          // Compute user-limit & set headroom
+          // Note: We compute both user-limit & headroom with the highest 
+          //       priority request as the target. 
           //       This works since we never assign lower priority requests
           //       before all higher priority ones are serviced.
           Resource userLimit = 
-              computeAndSetUserResourceLimit(application, clusterResource, 
-                  required);
-
+              computeUserLimitAndSetHeadroom(application, clusterResource, 
+                  required);          
+          
           // Check queue max-capacity limit
           if (!assignToQueue(clusterResource, required)) {
             return NULL_ASSIGNMENT;
@@ -777,13 +777,13 @@ public class LeafQueue implements CSQueu
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
                 null);
-          
-          Resource assigned = assignment.getResource();
-          
+
           // Did we schedule or reserve a container?
+          Resource assigned = assignment.getResource();
           if (Resources.greaterThan(assigned, Resources.none())) {
 
-            // Book-keeping
+            // Book-keeping 
+            // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned);
             
             // Reset scheduling opportunities
@@ -854,20 +854,50 @@ public class LeafQueue implements CSQueu
   }
 
   @Lock({LeafQueue.class, SchedulerApp.class})
-  private Resource computeAndSetUserResourceLimit(SchedulerApp application, 
-      Resource clusterResource, Resource required) {
+  private Resource computeUserLimitAndSetHeadroom(
+      SchedulerApp application, Resource clusterResource, Resource required) {
+    
     String user = application.getUser();
-    Resource limit = computeUserLimit(application, clusterResource, required);
+    
+    /** 
+     * Headroom is min((userLimit, queue-max-cap) - consumed)
+     */
+
+    Resource userLimit =                          // User limit
+        computeUserLimit(application, clusterResource, required);
+    
+
+    Resource queueMaxCap =                        // Queue Max-Capacity
+        Resources.createResource(
+            roundDown((int)(absoluteMaxCapacity * clusterResource.getMemory()))
+            );
+    
+    Resource userConsumed = getUser(user).getConsumedResources(); 
     Resource headroom = 
-        Resources.subtract(limit, getUser(user).getConsumedResources());
+        Resources.subtract(Resources.min(userLimit, queueMaxCap), userConsumed);
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Headroom calculation for user " + user + ": " + 
+          " userLimit=" + userLimit + 
+          " queueMaxCap=" + queueMaxCap + 
+          " consumed=" + userConsumed + 
+          " headroom=" + headroom);
+    }
+    
     application.setHeadroom(headroom);
     metrics.setAvailableResourcesToUser(user, headroom);
-    return limit;
+    
+    return userLimit;
   }
   
   private int roundUp(int memory) {
-    return divideAndCeil(memory, minimumAllocation.getMemory()) * 
-        minimumAllocation.getMemory();
+    int minMemory = minimumAllocation.getMemory();
+    return divideAndCeil(memory, minMemory) * minMemory; 
+  }
+  
+  private int roundDown(int memory) {
+    int minMemory = minimumAllocation.getMemory();
+    return (memory / minMemory) * minMemory;
   }
   
   @Lock(NoLock.class)
@@ -1288,10 +1318,17 @@ public class LeafQueue implements CSQueu
     String userName = application.getUser();
     User user = getUser(userName);
     user.assignContainer(resource);
+    Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
-    LOG.info(getQueueName() + 
-        " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " user-resources=" + user.getConsumedResources());
+    
+    if (LOG.isDebugEnabled()) {
+      LOG.info(getQueueName() + 
+          " user=" + userName + 
+          " used=" + usedResources + " numContainers=" + numContainers +
+          " headroom = " + application.getHeadroom() +
+          " user-resources=" + user.getConsumedResources()
+          );
+    }
   }
 
   synchronized void releaseResource(Resource clusterResource, 
@@ -1325,8 +1362,8 @@ public class LeafQueue implements CSQueu
     // Update application properties
     for (SchedulerApp application : activeApplications) {
       synchronized (application) {
-        computeAndSetUserResourceLimit(
-            application, clusterResource, Resources.none());
+        computeUserLimitAndSetHeadroom(application, clusterResource, 
+            Resources.none());
       }
     }
   }

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Thu Feb  2 00:42:18 2012
@@ -38,6 +38,8 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Container;
@@ -67,6 +69,8 @@ import org.mockito.stubbing.Answer;
 
 public class TestLeafQueue {
   
+  private static final Log LOG = LogFactory.getLog(TestLeafQueue.class);
+  
   private final RecordFactory recordFactory = 
       RecordFactoryProvider.getRecordFactory(null);
 
@@ -473,6 +477,115 @@ public class TestLeafQueue {
   }
   
   @Test
+  public void testHeadroomWithMaxCap() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    //unset maxCapacity
+    a.setMaxCapacity(1.0f);
+    
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    SchedulerApp app_0 = 
+        new SchedulerApp(appAttemptId_0, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    SchedulerApp app_1 = 
+        new SchedulerApp(appAttemptId_1, user_0, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_1, user_0, A);  // same user
+
+    final ApplicationAttemptId appAttemptId_2 = 
+        TestUtils.getMockApplicationAttemptId(2, 0); 
+    SchedulerApp app_2 = 
+        new SchedulerApp(appAttemptId_2, user_1, a, 
+            a.getActiveUsersManager(), rmContext, null);
+    a.submitApplication(app_2, user_1, A);
+
+    // Setup some nodes
+    String host_0 = "host_0";
+    SchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    String host_1 = "host_1";
+    SchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+    
+    final int numNodes = 2;
+    Resource clusterResource = Resources.createResource(numNodes * (8*GB));
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ 
+    // Setup resource-requests
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(RMNodeImpl.ANY, 2*GB, 1, priority,
+                recordFactory))); 
+
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 2, priority,
+            recordFactory))); 
+
+    /**
+     * Start testing...
+     */
+    
+    // Set user-limit
+    a.setUserLimit(50);
+    a.setUserLimitFactor(2);
+    
+    // Now, only user_0 should be active since he is the only one with
+    // outstanding requests
+    assertEquals("There should only be 1 active user!", 
+        1, a.getActiveUsersManager().getNumActiveUsers());
+
+    // 1 container to user_0
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(2*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
+
+    // Again one to user_0 since he hasn't exceeded user limit yet
+    a.assignContainers(clusterResource, node_0);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+    assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
+    
+    // Submit requests for app_1 and set max-cap
+    a.setMaxCapacity(.1f);
+    app_2.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 1, priority,
+            recordFactory))); 
+    assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+
+    // No more to user_0 since he is already over user-limit
+    // and no more containers to queue since it's already at max-cap
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(3*GB, a.getUsedResources().getMemory());
+    assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemory());
+    assertEquals(0*GB, app_0.getHeadroom().getMemory());
+    assertEquals(0*GB, app_1.getHeadroom().getMemory());
+    
+    // Check headroom for app_2 
+    LOG.info("here");
+    app_1.updateResourceRequests(Collections.singletonList(     // unset
+        TestUtils.createResourceRequest(RMNodeImpl.ANY, 1*GB, 0, priority,
+            recordFactory))); 
+    assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+    a.assignContainers(clusterResource, node_1);
+    assertEquals(1*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
+  }
+
+  @Test
   public void testSingleQueueWithMultipleUsers() throws Exception {
     
     // Mock the queue

Modified: hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java?rev=1239423&r1=1239422&r2=1239423&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java (original)
+++ hadoop/common/branches/branch-0.23/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java Thu Feb  2 00:42:18 2012
@@ -86,7 +86,7 @@ public class TestParentQueue {
   private SchedulerApp getMockApplication(int appId, String user) {
     SchedulerApp application = mock(SchedulerApp.class);
     doReturn(user).when(application).getUser();
-    doReturn(null).when(application).getHeadroom();
+    doReturn(Resources.createResource(0)).when(application).getHeadroom();
     return application;
   }