You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by ac...@apache.org on 2013/06/03 20:15:05 UTC

svn commit: r1489088 - in /hadoop/common/branches/branch-2/hadoop-yarn-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ hadoop-yarn/hadoop-yarn...

Author: acmurthy
Date: Mon Jun  3 18:15:04 2013
New Revision: 1489088

URL: http://svn.apache.org/r1489088
Log:
Merge -c 1489087 from trunk to branch-2 to fix YARN-398. Make it possible to specify hard locality constraints in resource requests for CapacityScheduler. Contributed by Arun C. Murthy.

Modified:
    hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
    hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Mon Jun  3 18:15:04 2013
@@ -91,6 +91,9 @@ Release 2.1.0-beta - UNRELEASED
     YARN-326. Add multi-resource scheduling to the fair scheduler. 
     (sandyr via tucu)
 
+    YARN-398. Make it possible to specify hard locality constraints in resource
+    requests for CapacityScheduler. (acmurthy)
+
   IMPROVEMENTS
 
     YARN-365. Change NM heartbeat handling to not generate a scheduler event

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java Mon Jun  3 18:15:04 2013
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -31,12 +32,14 @@ public class CSAssignment {
   private NodeType type;
   private final RMContainer excessReservation;
   private final FiCaSchedulerApp application;
+  private final boolean skipped;
   
   public CSAssignment(Resource resource, NodeType type) {
     this.resource = resource;
     this.type = type;
     this.application = null;
     this.excessReservation = null;
+    this.skipped = false;
   }
   
   public CSAssignment(FiCaSchedulerApp application, RMContainer excessReservation) {
@@ -44,8 +47,16 @@ public class CSAssignment {
     this.type = NodeType.NODE_LOCAL;
     this.application = application;
     this.excessReservation = excessReservation;
+    this.skipped = false;
+  }
+  
+  public CSAssignment(boolean skipped) {
+    this.resource = Resources.createResource(0, 0);
+    this.type = NodeType.NODE_LOCAL;
+    this.application = null;
+    this.excessReservation = null;
+    this.skipped = skipped;
   }
-
 
   public Resource getResource() {
     return resource;
@@ -67,6 +78,10 @@ public class CSAssignment {
     return excessReservation;
   }
 
+  public boolean getSkipped() {
+    return skipped;
+  }
+  
   @Override
   public String toString() {
     return resource.getMemory() + ":" + type;

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java Mon Jun  3 18:15:04 2013
@@ -784,6 +784,8 @@ public class LeafQueue implements CSQueu
   private static final CSAssignment NULL_ASSIGNMENT =
       new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
   
+  private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+  
   @Override
   public synchronized CSAssignment 
   assignContainers(Resource clusterResource, FiCaSchedulerNode node) {
@@ -853,6 +855,13 @@ public class LeafQueue implements CSQueu
             assignContainersOnNode(clusterResource, node, application, priority, 
                 null);
 
+          // Did the application skip this node?
+          if (assignment.getSkipped()) {
+            // Don't count 'skipped nodes' as a scheduling opportunity!
+            application.subtractSchedulingOpportunity(priority);
+            continue;
+          }
+          
           // Did we schedule or reserve a container?
           Resource assigned = assignment.getResource();
           if (Resources.greaterThan(
@@ -1104,73 +1113,88 @@ public class LeafQueue implements CSQueu
     Resource assigned = Resources.none();
 
     // Data-local
-    assigned = 
-        assignNodeLocalContainers(clusterResource, node, application, priority,
-            reservedContainer); 
-    if (Resources.greaterThan(resourceCalculator, clusterResource, 
-            assigned, Resources.none())) {
-      return new CSAssignment(assigned, NodeType.NODE_LOCAL);
+    ResourceRequest nodeLocalResourceRequest =
+        application.getResourceRequest(priority, node.getHostName());
+    if (nodeLocalResourceRequest != null) {
+      assigned = 
+          assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
+              node, application, priority, reservedContainer); 
+      if (Resources.greaterThan(resourceCalculator, clusterResource, 
+          assigned, Resources.none())) {
+        return new CSAssignment(assigned, NodeType.NODE_LOCAL);
+      }
     }
 
     // Rack-local
-    assigned = 
-        assignRackLocalContainers(clusterResource, node, application, priority, 
-            reservedContainer);
-    if (Resources.greaterThan(resourceCalculator, clusterResource, 
-            assigned, Resources.none())) {
-      return new CSAssignment(assigned, NodeType.RACK_LOCAL);
+    ResourceRequest rackLocalResourceRequest =
+        application.getResourceRequest(priority, node.getRackName());
+    if (rackLocalResourceRequest != null) {
+      if (!rackLocalResourceRequest.getRelaxLocality()) {
+        return SKIP_ASSIGNMENT;
+      }
+      
+      assigned = 
+          assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
+              node, application, priority, reservedContainer);
+      if (Resources.greaterThan(resourceCalculator, clusterResource, 
+          assigned, Resources.none())) {
+        return new CSAssignment(assigned, NodeType.RACK_LOCAL);
+      }
     }
     
     // Off-switch
-    return new CSAssignment(
-        assignOffSwitchContainers(clusterResource, node, application, 
-            priority, reservedContainer), 
-        NodeType.OFF_SWITCH);
+    ResourceRequest offSwitchResourceRequest =
+        application.getResourceRequest(priority, ResourceRequest.ANY);
+    if (offSwitchResourceRequest != null) {
+      if (!offSwitchResourceRequest.getRelaxLocality()) {
+        return SKIP_ASSIGNMENT;
+      }
+
+      return new CSAssignment(
+          assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
+              node, application, priority, reservedContainer), 
+              NodeType.OFF_SWITCH);
+    }
+    
+    return SKIP_ASSIGNMENT;
   }
 
-  private Resource assignNodeLocalContainers(Resource clusterResource, 
+  private Resource assignNodeLocalContainers(
+      Resource clusterResource, ResourceRequest nodeLocalResourceRequest, 
       FiCaSchedulerNode node, FiCaSchedulerApp application, 
       Priority priority, RMContainer reservedContainer) {
-    ResourceRequest request = 
-        application.getResourceRequest(priority, node.getHostName());
-    if (request != null) {
-      if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
-          reservedContainer)) {
-        return assignContainer(clusterResource, node, application, priority, 
-            request, NodeType.NODE_LOCAL, reservedContainer);
-      }
+    if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
+        reservedContainer)) {
+      return assignContainer(clusterResource, node, application, priority, 
+          nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer);
     }
     
     return Resources.none();
   }
 
-  private Resource assignRackLocalContainers(Resource clusterResource,  
+  private Resource assignRackLocalContainers(
+      Resource clusterResource, ResourceRequest rackLocalResourceRequest,  
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
       RMContainer reservedContainer) {
-    ResourceRequest request = 
-      application.getResourceRequest(priority, node.getRackName());
-    if (request != null) {
-      if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
-          reservedContainer)) {
-        return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.RACK_LOCAL, reservedContainer);
-      } 
+    if (canAssign(application, priority, node, NodeType.RACK_LOCAL, 
+        reservedContainer)) {
+      return assignContainer(clusterResource, node, application, priority, 
+          rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer);
     }
+    
     return Resources.none();
   }
 
-  private Resource assignOffSwitchContainers(Resource clusterResource, FiCaSchedulerNode node, 
-      FiCaSchedulerApp application, Priority priority, 
+  private Resource assignOffSwitchContainers(
+      Resource clusterResource, ResourceRequest offSwitchResourceRequest,
+      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, 
       RMContainer reservedContainer) {
-    ResourceRequest request = 
-      application.getResourceRequest(priority, ResourceRequest.ANY);
-    if (request != null) {
-      if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
-          reservedContainer)) {
-        return assignContainer(clusterResource, node, application, priority, request, 
-            NodeType.OFF_SWITCH, reservedContainer);
-      }
+    if (canAssign(application, priority, node, NodeType.OFF_SWITCH, 
+        reservedContainer)) {
+      return assignContainer(clusterResource, node, application, priority, 
+          offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer);
     }
+    
     return Resources.none();
   }
 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java Mon Jun  3 18:15:04 2013
@@ -332,6 +332,11 @@ public class FiCaSchedulerApp extends Sc
         schedulingOpportunities.count(priority) + 1);
   }
 
+  synchronized public void subtractSchedulingOpportunity(Priority priority) {
+    int count = schedulingOpportunities.count(priority) - 1;
+    this.schedulingOpportunities.setCount(priority, Math.max(count,  0));
+  }
+  
   /**
    * Return the number of times the application has been given an opportunity
    * to schedule a task at the given priority since the last time it

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java Mon Jun  3 18:15:04 2013
@@ -512,7 +512,7 @@ public class TestApplicationLimits {
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
     app_0_0_requests.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_0.updateResourceRequests(app_0_0_requests);
 
     // Schedule to compute 
@@ -531,7 +531,7 @@ public class TestApplicationLimits {
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
     app_0_1_requests.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_1.updateResourceRequests(app_0_1_requests);
 
     // Schedule to compute 
@@ -550,7 +550,7 @@ public class TestApplicationLimits {
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
     app_1_0_requests.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_1_0.updateResourceRequests(app_1_0_requests);
     
     // Schedule to compute 

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java Mon Jun  3 18:15:04 2013
@@ -292,8 +292,8 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
+                priority, recordFactory))); 
 
     // Start testing...
     
@@ -414,12 +414,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory))); 
 
     // Start testing...
     
@@ -547,12 +547,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory))); 
 
     /**
      * Start testing...
@@ -640,12 +640,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory))); 
 
     /**
      * Start testing...
@@ -679,8 +679,8 @@ public class TestLeafQueue {
     // Submit requests for app_1 and set max-cap
     a.setMaxCapacity(.1f);
     app_2.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+            priority, recordFactory))); 
     assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
 
     // No more to user_0 since he is already over user-limit
@@ -696,8 +696,8 @@ public class TestLeafQueue {
     // Check headroom for app_2 
     LOG.info("here");
     app_1.updateResourceRequests(Collections.singletonList(     // unset
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
+            priority, recordFactory))); 
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
     a.assignContainers(clusterResource, node_1);
     assertEquals(1*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
@@ -757,12 +757,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 10, true,
+            priority, recordFactory))); 
 
     /** 
      * Start testing... 
@@ -791,12 +791,12 @@ public class TestLeafQueue {
     // Submit resource requests for other apps now to 'activate' them
     
     app_2.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 1, true,
+            priority, recordFactory))); 
 
     app_3.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory))); 
 
     // Now allocations should goto app_2 since 
     // user_0 is at limit inspite of high user-limit-factor
@@ -919,12 +919,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
+            priority, recordFactory))); 
 
     // Start testing...
     
@@ -1021,19 +1021,19 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, priority,
-                recordFactory)));
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
+                priority, recordFactory)));
 
     // Setup app_1 to request a 4GB container on host_0 and
     // another 4GB container anywhere.
     ArrayList<ResourceRequest> appRequests_1 =
         new ArrayList<ResourceRequest>(4);
     appRequests_1.add(TestUtils.createResourceRequest(host_0, 4*GB, 1,
-        priority, recordFactory));
+        true, priority, recordFactory));
     appRequests_1.add(TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1,
-        priority, recordFactory));
+        true, priority, recordFactory));
     appRequests_1.add(TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 2,
-        priority, recordFactory));
+        true, priority, recordFactory));
     app_1.updateResourceRequests(appRequests_1);
 
     // Start testing...
@@ -1127,12 +1127,12 @@ public class TestLeafQueue {
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
-            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, priority,
-                recordFactory))); 
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+                priority, recordFactory))); 
 
     app_1.updateResourceRequests(Collections.singletonList(
-        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, priority,
-            recordFactory))); 
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
+            priority, recordFactory))); 
 
     // Start testing...
     
@@ -1242,19 +1242,19 @@ public class TestLeafQueue {
     List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_0, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 3, // one extra
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
 
     // Start testing...
@@ -1313,13 +1313,13 @@ public class TestLeafQueue {
     app_0_requests_0.clear();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, // one extra
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     
@@ -1385,31 +1385,31 @@ public class TestLeafQueue {
     Priority priority_1 = TestUtils.createMockPriority(1);
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_0, 1*GB, 1, 
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_1, 1*GB, 1, 
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2,
-            priority_1, recordFactory));
+            true, priority_1, recordFactory));
     
     // P2
     Priority priority_2 = TestUtils.createMockPriority(2);
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_2, 2*GB, 1, 
-            priority_2, recordFactory));
+            true, priority_2, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_2, 2*GB, 1, 
-            priority_2, recordFactory));
+            true, priority_2, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1,
-            priority_2, recordFactory));
+            true, priority_2, recordFactory));
     
     app_0.updateResourceRequests(app_0_requests_0);
 
@@ -1513,19 +1513,19 @@ public class TestLeafQueue {
     List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_0_0, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_0_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_0, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(host_1_0, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0_requests_0.add(
         TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
 
     // Start testing...
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
     app_0_requests_0.clear();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
     
     // NODE_LOCAL - node_0_1
@@ -1557,7 +1557,7 @@ public class TestLeafQueue {
     app_0_requests_0.clear();
     app_0_requests_0.add(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
-            priority, recordFactory));
+            true, priority, recordFactory));
     app_0.updateResourceRequests(app_0_requests_0);
 
     // No allocation on node_0_1 even though it's node/rack local since
@@ -1731,7 +1731,174 @@ public class TestLeafQueue {
           c1.getQueueUserAclInfo(user), QueueACL.SUBMIT_APPLICATIONS));
 
   }
-  
+
+  @Test
+  public void testLocalityConstraints() throws Exception {
+
+    // Manipulate queue 'a'
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // User
+    String user_0 = "user_0";
+    
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 = 
+        TestUtils.getMockApplicationAttemptId(0, 0); 
+    FiCaSchedulerApp app_0 = 
+        spy(new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext));
+    a.submitApplication(app_0, user_0, A);
+
+    final ApplicationAttemptId appAttemptId_1 = 
+        TestUtils.getMockApplicationAttemptId(1, 0); 
+    FiCaSchedulerApp app_1 = 
+        spy(new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
+            mock(ActiveUsersManager.class), rmContext));
+    a.submitApplication(app_1, user_0, A);
+
+    // Setup some nodes and racks
+    String host_0_0 = "127.0.0.1";
+    String rack_0 = "rack_0";
+    FiCaSchedulerNode node_0_0 = TestUtils.getMockNode(host_0_0, rack_0, 0, 8*GB);
+    String host_0_1 = "127.0.0.2";
+    FiCaSchedulerNode node_0_1 = TestUtils.getMockNode(host_0_1, rack_0, 0, 8*GB);
+    
+    
+    String host_1_0 = "127.0.0.3";
+    String rack_1 = "rack_1";
+    FiCaSchedulerNode node_1_0 = TestUtils.getMockNode(host_1_0, rack_1, 0, 8*GB);
+    String host_1_1 = "127.0.0.4";
+    FiCaSchedulerNode node_1_1 = TestUtils.getMockNode(host_1_1, rack_1, 0, 8*GB);
+    
+    final int numNodes = 4;
+    Resource clusterResource = Resources.createResource(
+        numNodes * (8*GB), numNodes * 1);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests
+    // resourceName: <priority, memory, #containers, relaxLocality>
+    // host_0_0: < 1, 1GB, 1, true >
+    // host_0_1: < null >
+    // rack_0:   < null >                     <----
+    // host_1_0: < 1, 1GB, 1, true >
+    // host_1_1: < null >
+    // rack_1:   < 1, 1GB, 1, false >         <----
+    // ANY:      < 1, 1GB, 1, false >         <----
+    // Availability:
+    // host_0_0: 8G
+    // host_0_1: 8G
+    // host_1_0: 8G
+    // host_1_1: 8G
+    Priority priority = TestUtils.createMockPriority(1);
+    List<ResourceRequest> app_0_requests_0 = new ArrayList<ResourceRequest>();
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_0_0, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(host_1_0, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            false, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
+            false, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    app_0_requests_0.clear();
+
+    //
+    // Start testing...
+    //
+    
+    // node_0_1  
+    // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
+    a.assignContainers(clusterResource, node_0_1);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
+    
+    // resourceName: <priority, memory, #containers, relaxLocality>
+    // host_0_0: < 1, 1GB, 1, true >
+    // host_0_1: < null >
+    // rack_0:   < null >                     <----
+    // host_1_0: < 1, 1GB, 1, true >
+    // host_1_1: < null >
+    // rack_1:   < 1, 1GB, 1, false >         <----
+    // ANY:      < 1, 1GB, 1, false >         <----
+    // Availability:
+    // host_0_0: 8G
+    // host_0_1: 8G
+    // host_1_0: 8G
+    // host_1_1: 8G
+
+    // node_1_1  
+    // Shouldn't allocate since RR(rack_1) = relax: false
+    a.assignContainers(clusterResource, node_1_1);
+    verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
+    
+    // Allow rack-locality for rack_1
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            true, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    app_0_requests_0.clear();
+
+    // resourceName: <priority, memory, #containers, relaxLocality>
+    // host_0_0: < 1, 1GB, 1, true >
+    // host_0_1: < null >
+    // rack_0:   < null >                     
+    // host_1_0: < 1, 1GB, 1, true >
+    // host_1_1: < null >
+    // rack_1:   < 1, 1GB, 1, true >          <----
+    // ANY:      < 1, 1GB, 1, false >         
+    // Availability:
+    // host_0_0: 8G
+    // host_0_1: 8G
+    // host_1_0: 8G
+    // host_1_1: 8G
+
+    // node_1_1  
+    // Now, should allocate since RR(rack_1) = relax: true
+    a.assignContainers(clusterResource, node_1_1);
+    verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+    // Now sanity-check node_local
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(rack_1, 1*GB, 1, 
+            false, priority, recordFactory));
+    app_0_requests_0.add(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, // only one
+            false, priority, recordFactory));
+    app_0.updateResourceRequests(app_0_requests_0);
+    app_0_requests_0.clear();
+    
+    // resourceName: <priority, memory, #containers, relaxLocality>
+    // host_0_0: < 1, 1GB, 1, true >
+    // host_0_1: < null >
+    // rack_0:   < null >                     
+    // host_1_0: < 1, 1GB, 1, true >
+    // host_1_1: < null >
+    // rack_1:   < 1, 1GB, 1, false >          <----
+    // ANY:      < 1, 1GB, 1, false >          <----
+    // Availability:
+    // host_0_0: 8G
+    // host_0_1: 8G
+    // host_1_0: 8G
+    // host_1_1: 7G
+
+    a.assignContainers(clusterResource, node_1_0);
+    verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
+        any(Priority.class), any(ResourceRequest.class), any(Container.class));
+    assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
+    assertEquals(0, app_0.getTotalRequiredResources(priority));
+
+  }
+
   @After
   public void tearDown() throws Exception {
   }

Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java?rev=1489088&r1=1489087&r2=1489088&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java Mon Jun  3 18:15:04 2013
@@ -115,15 +115,16 @@ public class TestUtils {
   }
   
   public static ResourceRequest createResourceRequest(
-      String hostName, int memory, int numContainers, Priority priority,
-      RecordFactory recordFactory) {
+      String resourceName, int memory, int numContainers, boolean relaxLocality,
+      Priority priority, RecordFactory recordFactory) {
     ResourceRequest request = 
         recordFactory.newRecordInstance(ResourceRequest.class);
     Resource capability = Resources.createResource(memory, 1);
     
     request.setNumContainers(numContainers);
-    request.setResourceName(hostName);
+    request.setResourceName(resourceName);
     request.setCapability(capability);
+    request.setRelaxLocality(relaxLocality);
     request.setPriority(priority);
     return request;
   }