You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/14 20:46:17 UTC

[2/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 882498a..eb64d43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -377,16 +375,29 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
-    CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-    Set<String> nodeLabels = node.getLabels();
-    
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode) {
     // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
-      return assignment;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(node.getPartition())) {
+      return NULL_ASSIGNMENT;
+    }
+    
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!super.hasPendingResourceRequest(node.getPartition(),
+        clusterResource, schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + node.getPartition());
+      }
+      return NULL_ASSIGNMENT;
     }
     
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+    
     while (canAssign(clusterResource, node)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
@@ -396,15 +407,17 @@ public class ParentQueue extends AbstractCSQueue {
       // Are we over maximum-capacity for this queue?
       // This will also consider parent's limits and also continuous reservation
       // looking
-      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
-          minimumAllocation, Resources.createResource(getMetrics()
-              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+      if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+          resourceLimits, minimumAllocation, Resources.createResource(
+              getMetrics().getReservedMB(), getMetrics()
+                  .getReservedVirtualCores()), schedulingMode)) {
         break;
       }
       
       // Schedule
-      CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node, resourceLimits);
+      CSAssignment assignedToChild =
+          assignContainersToChildQueues(clusterResource, node, resourceLimits,
+              schedulingMode);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -413,7 +426,7 @@ public class ParentQueue extends AbstractCSQueue {
               assignedToChild.getResource(), Resources.none())) {
         // Track resource utilization for the parent-queue
         super.allocateResource(clusterResource, assignedToChild.getResource(),
-            nodeLabels);
+            node.getPartition());
         
         // Track resource utilization in this pass of the scheduler
         Resources
@@ -510,7 +523,8 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   private synchronized CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
+      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
+      SchedulingMode schedulingMode) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -523,12 +537,13 @@ public class ParentQueue extends AbstractCSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      
+
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, limits);
       
-      assignment = childQueue.assignContainers(cluster, node, childLimits);
+      assignment = childQueue.assignContainers(cluster, node, 
+          childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
@@ -584,7 +599,7 @@ public class ParentQueue extends AbstractCSQueue {
       // Book keeping
       synchronized (this) {
         super.releaseResource(clusterResource, rmContainer.getContainer()
-            .getResource(), node.getLabels());
+            .getResource(), node.getPartition());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -653,7 +668,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
     }
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -681,7 +696,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -701,7 +716,7 @@ public class ParentQueue extends AbstractCSQueue {
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.releaseResource(clusterResource,
           rmContainer.getContainer().getResource(),
-          node.getLabels());
+          node.getPartition());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
new file mode 100644
index 0000000..7e7dc37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+/**
+ * Scheduling modes, see below for detailed explanations 
+ */
+public enum SchedulingMode {
+  /**
+   * <p>
+   * When a node has partition (say partition=x), only application in the queue
+   * can access to partition=x AND requires for partition=x resource can get
+   * chance to allocate on the node.
+   * </p>
+   * 
+   * <p>
+   * When a node has no partition, only application requires non-partitioned
+   * resource can get chance to allocate on the node.
+   * </p>
+   */
+  RESPECT_PARTITION_EXCLUSIVITY,
+  
+  /**
+   * Only used when a node has partition AND the partition isn't an exclusive
+   * partition AND application requires non-partitioned resource.
+   */
+  IGNORE_PARTITION_EXCLUSIVITY
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index 76ede39..9b7eb84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -277,6 +278,9 @@ public class Application {
     } else {
       request.setNumContainers(request.getNumContainers() + 1);
     }
+    if (request.getNodeLabelExpression() == null) {
+      request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    }
     
     // Note this down for next interaction with ResourceManager
     ask.remove(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index f62fdb3..5c107aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -150,8 +150,14 @@ public class MockAM {
   public AllocateResponse allocate(
       String host, int memory, int numContainers,
       List<ContainerId> releases, String labelExpression) throws Exception {
+    return allocate(host, memory, numContainers, 1, releases, labelExpression);
+  }
+  
+  public AllocateResponse allocate(
+      String host, int memory, int numContainers, int priority,
+      List<ContainerId> releases, String labelExpression) throws Exception {
     List<ResourceRequest> reqs =
-        createReq(new String[] { host }, memory, 1, numContainers,
+        createReq(new String[] { host }, memory, priority, numContainers,
             labelExpression);
     return allocate(reqs, releases);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 06c6b32..f2b1d86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -200,10 +202,18 @@ public class MockRM extends ResourceManager {
   
   public boolean waitForState(MockNM nm, ContainerId containerId,
       RMContainerState containerState, int timeoutMillisecs) throws Exception {
+    return waitForState(Arrays.asList(nm), containerId, containerState,
+        timeoutMillisecs);
+  }
+  
+  public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
+      RMContainerState containerState, int timeoutMillisecs) throws Exception {
     RMContainer container = getResourceScheduler().getRMContainer(containerId);
     int timeoutSecs = 0;
     while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
-      nm.nodeHeartbeat(true);
+      for (MockNM nm : nms) {
+        nm.nodeHeartbeat(true);
+      }
       container = getResourceScheduler().getRMContainer(containerId);
       System.out.println("Waiting for container " + containerId + " to be allocated.");
       Thread.sleep(100);
@@ -217,9 +227,11 @@ public class MockRM extends ResourceManager {
         && timeoutSecs++ < timeoutMillisecs / 100) {
       System.out.println("Container : " + containerId + " State is : "
           + container.getState() + " Waiting for state : " + containerState);
-      nm.nodeHeartbeat(true);
+      for (MockNM nm : nms) {
+        nm.nodeHeartbeat(true);
+      }
       Thread.sleep(100);
-      
+
       if (timeoutMillisecs <= timeoutSecs * 100) {
         return false;
       }
@@ -650,11 +662,28 @@ public class MockRM extends ResourceManager {
     am.waitForState(RMAppAttemptState.FINISHED);
     rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
   }
+  
+  @SuppressWarnings("rawtypes")
+  private static void waitForSchedulerAppAttemptAdded(
+      ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
+    int tick = 0;
+    // Wait for at most 5 sec
+    while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
+        .getApplicationAttempt(attemptId) && tick < 50) {
+      Thread.sleep(100);
+      if (tick % 10 == 0) {
+        System.out.println("waiting for SchedulerApplicationAttempt="
+            + attemptId + " added.");
+      }
+      tick++;
+    }
+  }
 
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
     rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt = app.getCurrentAppAttempt();
+    waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
     System.out.println("Launch AM " + attempt.getAppAttemptId());
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 1ca5c97..46167ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -612,7 +612,7 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
@@ -632,7 +632,7 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
     
@@ -652,7 +652,7 @@ public class TestApplicationLimits {
     
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -661,7 +661,7 @@ public class TestApplicationLimits {
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 23b31fa..970a98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -133,7 +134,7 @@ public class TestChildQueueOrder {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, null);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
@@ -145,7 +146,7 @@ public class TestChildQueueOrder {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
               .assignContainers(eq(clusterResource), eq(node),
-                  any(ResourceLimits.class));
+                  any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -157,7 +158,7 @@ public class TestChildQueueOrder {
       }
     }).
     when(queue).assignContainers(eq(clusterResource), eq(node), 
-        any(ResourceLimits.class));
+        any(ResourceLimits.class), any(SchedulingMode.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
 
@@ -241,6 +242,14 @@ public class TestChildQueueOrder {
     CSQueue b = queues.get(B);
     CSQueue c = queues.get(C);
     CSQueue d = queues.get(D);
+    
+    // Make a/b/c/d has >0 pending resource, so that allocation will continue.
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+        .incPending(Resources.createResource(1 * GB));
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     final String user_0 = "user_0";
 
@@ -275,7 +284,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     for(int i=0; i < 2; i++)
     {
       stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -283,7 +292,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     } 
     for(int i=0; i < 3; i++)
     {
@@ -292,7 +301,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }  
     for(int i=0; i < 4; i++)
     {
@@ -301,7 +310,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -335,7 +344,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -363,7 +372,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -390,7 +399,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -405,12 +414,14 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(d,b);
-    allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
+    allocationOrder.verify(d).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 03b8f5c..54ba617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -51,9 +54,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
@@ -327,387 +334,4 @@ public class TestContainerAllocation {
     rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
     MockRM.launchAndRegisterAM(app1, rm1, nm1);
   }
-  
-  private Configuration getConfigurationWithQueueLabels(Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-    
-    // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    conf.setCapacity(A, 10);
-    conf.setMaximumCapacity(A, 15);
-    conf.setAccessibleNodeLabels(A, toSet("x"));
-    conf.setCapacityByLabel(A, "x", 100);
-    
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    conf.setCapacity(B, 20);
-    conf.setAccessibleNodeLabels(B, toSet("y"));
-    conf.setCapacityByLabel(B, "y", 100);
-    
-    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
-    conf.setCapacity(C, 70);
-    conf.setMaximumCapacity(C, 70);
-    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
-    
-    // Define 2nd-level queues
-    final String A1 = A + ".a1";
-    conf.setQueues(A, new String[] {"a1"});
-    conf.setCapacity(A1, 100);
-    conf.setMaximumCapacity(A1, 100);
-    conf.setCapacityByLabel(A1, "x", 100);
-    
-    final String B1 = B + ".b1";
-    conf.setQueues(B, new String[] {"b1"});
-    conf.setCapacity(B1, 100);
-    conf.setMaximumCapacity(B1, 100);
-    conf.setCapacityByLabel(B1, "y", 100);
-
-    final String C1 = C + ".c1";
-    conf.setQueues(C, new String[] {"c1"});
-    conf.setCapacity(C1, 100);
-    conf.setMaximumCapacity(C1, 100);
-    
-    return conf;
-  }
-  
-  private void checkTaskContainersHost(ApplicationAttemptId attemptId,
-      ContainerId containerId, ResourceManager rm, String host) {
-    YarnScheduler scheduler = rm.getRMContext().getScheduler();
-    SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
-
-    Assert.assertTrue(appReport.getLiveContainers().size() > 0);
-    for (RMContainer c : appReport.getLiveContainers()) {
-      if (c.getContainerId().equals(containerId)) {
-        Assert.assertEquals(host, c.getAllocatedNode().getHost());
-      }
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  private <E> Set<E> toSet(E... elements) {
-    Set<E> set = Sets.newHashSet(elements);
-    return set;
-  }
-  
-  @Test (timeout = 300000)
-  public void testContainerAllocationWithSingleUserLimits() throws Exception {
-    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
-    mgr.init(conf);
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    
-    // A has only 10% of x, so it can only allocate one container in label=empty
-    ContainerId containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-    // Cannot allocate 2nd label=empty container
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-
-    // A has default user limit = 100, so it can use all resource in label = x
-    // We can allocate floor(8000 / 1024) = 7 containers
-    for (int id = 3; id <= 8; id++) {
-      containerId =
-          ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
-      am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
-      Assert.assertTrue(rm1.waitForState(nm1, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-    }
-    rm1.close();
-  }
-  
-  @Test(timeout = 300000)
-  public void testContainerAllocateWithComplexLabels() throws Exception {
-    /*
-     * Queue structure:
-     *                      root (*)
-     *                  ________________
-     *                 /                \
-     *               a x(100%), y(50%)   b y(50%), z(100%)
-     *               ________________    ______________
-     *              /                   /              \
-     *             a1 (x,y)         b1(no)              b2(y,z)
-     *               100%                          y = 100%, z = 100%
-     *                           
-     * Node structure:
-     * h1 : x
-     * h2 : y
-     * h3 : y
-     * h4 : z
-     * h5 : NO
-     * 
-     * Total resource:
-     * x: 4G
-     * y: 6G
-     * z: 2G
-     * *: 2G
-     * 
-     * Resource of
-     * a1: x=4G, y=3G, NO=0.2G
-     * b1: NO=0.9G (max=1G)
-     * b2: y=3, z=2G, NO=0.9G (max=1G)
-     * 
-     * Each node can only allocate two containers
-     */
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
-        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
-        NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
-        toSet("z"), NodeId.newInstance("h5", 0),
-        RMNodeLabelsManager.EMPTY_STRING_SET));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 2048);
-    MockNM nm2 = rm1.registerNode("h2:1234", 2048);
-    MockNM nm3 = rm1.registerNode("h3:1234", 2048);
-    MockNM nm4 = rm1.registerNode("h4:1234", 2048);
-    MockNM nm5 = rm1.registerNode("h5:1234", 2048);
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    // request a container (label = y). can be allocated on nm2 
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h5
-    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
-
-    // request a container for AM, will succeed
-    // and now b1's queue capacity will be used, cannot allocate more containers
-    // (Maximum capacity reached)
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm4, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertFalse(rm1.waitForState(nm5, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    
-    // launch an app to queue b2
-    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
-
-    // request a container. try to allocate on nm1 (label = x) and nm3 (label =
-    // y,z). Will successfully allocate on nm3
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-    
-    // try to allocate container (request label = z) on nm4 (label = y,z). 
-    // Will successfully allocate on nm4 only.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
-    Assert.assertTrue(rm1.waitForState(nm4, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h4");
-
-    rm1.close();
-  }
-
-  @Test (timeout = 120000)
-  public void testContainerAllocateWithLabels() throws Exception {
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
-
-    // request a container.
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h1");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h2
-    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
-
-    // request a container.
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-    
-    // launch an app to queue c1 (label = ""), and check all container will
-    // be allocated in h3
-    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
-    // request a container.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-
-    rm1.close();
-  }
-  
-  @Test (timeout = 120000)
-  public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
-    // This test is pretty much similar to testContainerAllocateWithLabel.
-    // Difference is, this test doesn't specify label expression in ResourceRequest,
-    // instead, it uses default queue label expression
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    // request a container.
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h1");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h2
-    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // request a container.
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-    
-    // launch an app to queue c1 (label = ""), and check all container will
-    // be allocated in h3
-    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
-    // request a container.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-
-    rm1.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 972cabb..0b5250b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -351,7 +351,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(
         (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -487,7 +487,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -498,7 +498,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -507,7 +507,7 @@ public class TestLeafQueue {
     
     // Can't allocate 3rd due to user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +517,7 @@ public class TestLeafQueue {
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -526,7 +526,7 @@ public class TestLeafQueue {
 
     // One more should work, for app_1, due to user-limit-factor
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -537,7 +537,7 @@ public class TestLeafQueue {
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
     a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -653,21 +653,21 @@ public class TestLeafQueue {
 
     // 1 container to user_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Again one to user_0 since he hasn't exceeded user limit yet
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
     // One more to user_0 since he is the only active user
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -719,10 +719,10 @@ public class TestLeafQueue {
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     //maxqueue 16G, userlimit 13G, - 4G used = 9G
     assertEquals(9*GB,app_0.getHeadroom().getMemory());
@@ -739,10 +739,10 @@ public class TestLeafQueue {
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
     qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     assertEquals(8*GB, qb.getUsedResources().getMemory());
     assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
@@ -782,12 +782,12 @@ public class TestLeafQueue {
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, qb.getUsedResources().getMemory());
     //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
     assertEquals(5*GB, app_3.getHeadroom().getMemory());
@@ -803,13 +803,13 @@ public class TestLeafQueue {
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
     qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     
     
     //app3 is user1, active from last test case
@@ -876,7 +876,7 @@ public class TestLeafQueue {
                 priority, recordFactory)));
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -893,7 +893,7 @@ public class TestLeafQueue {
             priority, recordFactory)));
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -982,7 +982,7 @@ public class TestLeafQueue {
 
     // 1 container to user_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -993,7 +993,7 @@ public class TestLeafQueue {
 
     // Again one to user_0 since he hasn't exceeded user limit yet
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1010,7 +1010,7 @@ public class TestLeafQueue {
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1024,7 +1024,7 @@ public class TestLeafQueue {
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(0*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
   }
 
@@ -1095,7 +1095,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1103,7 +1103,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1111,7 @@ public class TestLeafQueue {
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1130,7 +1130,7 @@ public class TestLeafQueue {
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1140,7 @@ public class TestLeafQueue {
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1151,7 +1151,7 @@ public class TestLeafQueue {
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1163,7 +1163,7 @@ public class TestLeafQueue {
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(7*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1172,7 +1172,7 @@ public class TestLeafQueue {
 
     // Now we should assign to app_3 again since user_2 is under user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1272,7 +1272,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1283,7 +1283,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1292,7 +1292,7 @@ public class TestLeafQueue {
     
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1309,7 +1309,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1326,7 +1326,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1394,7 @@ public class TestLeafQueue {
     // Start testing...
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1404,7 +1404,7 @@ public class TestLeafQueue {
 
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1418,7 +1418,7 @@ public class TestLeafQueue {
     doReturn(-1).when(a).getNodeLocalityDelay();
     
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(10*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1435,7 +1435,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1504,7 +1504,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1512,14 +1512,14 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1544,7 +1544,7 @@ public class TestLeafQueue {
 
     // Re-reserve
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1554,7 +1554,7 @@ public class TestLeafQueue {
     
     // Try to schedule on node_1 now, should *move* the reservation
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(9*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1572,7 +1572,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     CSAssignment assignment = a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1644,7 +1644,7 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1653,7 +1653,7 @@ public class TestLeafQueue {
 
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1662,7 +1662,7 @@ public class TestLeafQueue {
     
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1672,7 +1672,7 @@ public class TestLeafQueue {
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1681,7 +1681,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_0
     assignment = a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1690,7 +1690,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1719,14 +1719,14 @@ public class TestLeafQueue {
     
     // Shouldn't assign RACK_LOCAL yet
     assignment = a.assignContainers(clusterResource, node_3,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1808,7 +1808,7 @@ public class TestLeafQueue {
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1821,7 +1821,7 @@ public class TestLeafQueue {
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1833,7 +1833,7 @@ public class TestLeafQueue {
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1845,7 +1845,7 @@ public class TestLeafQueue {
 
     // Now, DATA_LOCAL for P1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1857,7 +1857,7 @@ public class TestLeafQueue {
 
     // Now, OFF_SWITCH for P2
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1934,7 +1934,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_0_1
     a.assignContainers(clusterResource, node_0_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1943,7 +1943,7 @@ public class TestLeafQueue {
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
     a.assignContainers(clusterResource, node_1_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1960,7 +1960,7 @@ public class TestLeafQueue {
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
     a.assignContainers(clusterResource, node_0_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
@@ -1968,7 +1968,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_1
     a.assignContainers(clusterResource, node_1_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2221,7 +2221,7 @@ public class TestLeafQueue {
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
     a.assignContainers(clusterResource, node_0_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2244,7 +2244,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2275,7 +2275,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2304,7 +2304,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2331,7 +2331,7 @@ public class TestLeafQueue {
 
     // Now, should allocate since RR(rack_1) = relax: true
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2362,7 +2362,7 @@ public class TestLeafQueue {
     // host_1_1: 7G
 
     a.assignContainers(clusterResource, node_1_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2445,7 +2445,7 @@ public class TestLeafQueue {
 
     try {
       a.assignContainers(clusterResource, node_0, 
-          new ResourceLimits(clusterResource));
+          new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");