You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/14 20:46:16 UTC

[1/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan

Repository: hadoop
Updated Branches:
  refs/heads/trunk b46ee1e7a -> 0fefda645


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
new file mode 100644
index 0000000..cf1b26f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -0,0 +1,1027 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestNodeLabelContainerAllocation {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+  
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+  
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 15);
+    conf.setAccessibleNodeLabels(A, toSet("x"));
+    conf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 20);
+    conf.setAccessibleNodeLabels(B, toSet("y"));
+    conf.setCapacityByLabel(B, "y", 100);
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    conf.setCapacity(C, 70);
+    conf.setMaximumCapacity(C, 70);
+    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setCapacityByLabel(A1, "x", 100);
+    
+    final String B1 = B + ".b1";
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    conf.setMaximumCapacity(B1, 100);
+    conf.setCapacityByLabel(B1, "y", 100);
+
+    final String C1 = C + ".c1";
+    conf.setQueues(C, new String[] {"c1"});
+    conf.setCapacity(C1, 100);
+    conf.setMaximumCapacity(C1, 100);
+    
+    return conf;
+  }
+  
+  private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+      ContainerId containerId, ResourceManager rm, String host) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+    SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+    Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+    for (RMContainer c : appReport.getLiveContainers()) {
+      if (c.getContainerId().equals(containerId)) {
+        Assert.assertEquals(host, c.getAllocatedNode().getHost());
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  
+  @Test (timeout = 300000)
+  public void testContainerAllocationWithSingleUserLimits() throws Exception {
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    // A has only 10% of x, so it can only allocate one container in label=empty
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    // Cannot allocate 2nd label=empty container
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+
+    // A has default user limit = 100, so it can use all resource in label = x
+    // We can allocate floor(8000 / 1024) = 7 containers
+    for (int id = 3; id <= 8; id++) {
+      containerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
+      am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+      Assert.assertTrue(rm1.waitForState(nm1, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    }
+    rm1.close();
+  }
+  
+  @Test(timeout = 300000)
+  public void testContainerAllocateWithComplexLabels() throws Exception {
+    /*
+     * Queue structure:
+     *                      root (*)
+     *                  ________________
+     *                 /                \
+     *               a x(100%), y(50%)   b y(50%), z(100%)
+     *               ________________    ______________
+     *              /                   /              \
+     *             a1 (x,y)         b1(no)              b2(y,z)
+     *               100%                          y = 100%, z = 100%
+     *                           
+     * Node structure:
+     * h1 : x
+     * h2 : y
+     * h3 : y
+     * h4 : z
+     * h5 : NO
+     * 
+     * Total resource:
+     * x: 4G
+     * y: 6G
+     * z: 2G
+     * *: 2G
+     * 
+     * Resource of
+     * a1: x=4G, y=3G, NO=0.2G
+     * b1: NO=0.9G (max=1G)
+     * b2: y=3, z=2G, NO=0.9G (max=1G)
+     * 
+     * Each node can only allocate two containers
+     */
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
+        NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
+        toSet("z"), NodeId.newInstance("h5", 0),
+        RMNodeLabelsManager.EMPTY_STRING_SET));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+    MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+    MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+    MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+    MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // request a container (label = y). can be allocated on nm2 
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h5
+    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+    // request a container for AM, will succeed
+    // and now b1's queue capacity will be used, cannot allocate more containers
+    // (Maximum capacity reached)
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm4, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertFalse(rm1.waitForState(nm5, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // launch an app to queue b2
+    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+    // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+    // y,z). Will successfully allocate on nm3
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+    
+    // try to allocate container (request label = z) on nm4 (label = y,z). 
+    // Will successfully allocate on nm4 only.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
+    Assert.assertTrue(rm1.waitForState(nm4, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h4");
+
+    rm1.close();
+  }
+
+  @Test (timeout = 120000)
+  public void testContainerAllocateWithLabels() throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+    // request a container.
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h2
+    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+    // request a container.
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+    
+    // launch an app to queue c1 (label = ""), and check all container will
+    // be allocated in h3
+    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // request a container.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+
+    rm1.close();
+  }
+  
+  @Test (timeout = 120000)
+  public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+    // This test is pretty much similar to testContainerAllocateWithLabel.
+    // Difference is, this test doesn't specify label expression in ResourceRequest,
+    // instead, it uses default queue label expression
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // request a container.
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h2
+    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // request a container.
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+    
+    // launch an app to queue c1 (label = ""), and check all container will
+    // be allocated in h3
+    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // request a container.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+
+    rm1.close();
+  }
+  
+  private void checkPendingResource(MockRM rm, int priority,
+      ApplicationAttemptId attemptId, int memory) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
+    ResourceRequest rr =
+        app.getAppSchedulingInfo().getResourceRequest(
+            Priority.newInstance(priority), "*");
+    Assert.assertEquals(memory,
+        rr.getCapability().getMemory() * rr.getNumContainers());
+  }
+  
+  private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
+      int numContainers) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode node = cs.getSchedulerNode(nodeId);
+    Assert.assertEquals(numContainers, node.getNumContainers());
+  }
+  
+  @Test
+  public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
+    /**
+     * Test case: Submit two application to a queue (app1 first then app2), app1
+     * asked for no-label, app2 asked for label=x, when node1 has label=x
+     * doing heart beat, app2 will get allocation first, even if app2 submits later
+     * than app1
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes y to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm2
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // launch another app to queue b1 (label = y), AM container should be launched in nm2
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // request container and nm1 do heartbeat (nm2 has label=y), note that app1
+    // request non-labeled container, and app2 request labeled container, app2
+    // will get allocated first even if app1 submitted first.  
+    am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
+    am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
+    
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    
+    // Do node heartbeats many times
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+    
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
+    // app1 get nothing in nm1 (partition=y)
+    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
+    checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
+    // app2 get all resource in nm1 (partition=y)
+    checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
+    checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
+    
+    rm1.close();
+  }
+  
+  private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
+      NodeId nodeId, FiCaSchedulerApp app) {
+    int num = 0;
+    for (RMContainer container : app.getLiveContainers()) {
+      if (container.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    Assert.assertEquals(expectedNum, num);
+  }
+  
+  @Test
+  public void
+      testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
+          throws Exception {
+    /**
+     * Test case: Submit one application, it asks label="" in priority=1 and
+     * label="x" in priority=2, when a node with label=x heartbeat, priority=2
+     * will get allocation first even if there're pending resource in priority=1
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes y to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+    
+    ContainerId nextContainerId;
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // request containers from am2, priority=1 asks for "" and priority=2 asks
+    // for "y", "y" container should be allocated first
+    nextContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
+    am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
+    Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // Check pending resource for am2, priority=1 doesn't get allocated before
+    // priority=2 allocated
+    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
+    checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
+    
+    rm1.close();
+  }
+  
+  @Test
+  public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
+      throws Exception {
+    /**
+     * Test case: Submit one application, it asks 6 label="" containers, NM1
+     * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
+     * if NM1 has idle resource, containers are all allocated to NM2 since
+     * non-labeled request should get allocation on non-labeled nodes first.
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+    
+    ContainerId nextContainerId;
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
+    // nm2/nm3 do
+    // heartbeat at the same time, check containers are always allocated to nm3.
+    // This is to verify when there's resource available in non-labeled
+    // partition, non-labeled resource should allocate to non-labeled partition
+    // first.
+    am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
+    for (int i = 2; i < 2 + 6; i++) {
+      nextContainerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
+      Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
+          nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
+    }
+    // no more container allocated on nm1
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
+    // all 7 (1 AM container + 6 task container) containers allocated on nm2
+    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);   
+    
+    rm1.close();
+  }
+
+  @Test
+  public void testPreferenceOfQueuesTowardsNodePartitions()
+      throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /   |   \
+     *        a     b    c
+     *       / \   / \  /  \
+     *      a1 a2 b1 b2 c1 c2
+     *     (x)    (x)   (x)
+     * </pre>
+     * 
+     * Only a1, b1, c1 can access label=x, and their default label=x Each each
+     * has one application, asks for 5 containers. NM1 has label=x
+     * 
+     * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
+     * containers with label=x
+     */
+    
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+    
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 33);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 33);
+    csConf.setQueues(A, new String[] {"a1", "a2"});
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 33);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 33);
+    csConf.setQueues(B, new String[] {"b1", "b2"});
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    csConf.setCapacity(C, 34);
+    csConf.setAccessibleNodeLabels(C, toSet("x"));
+    csConf.setCapacityByLabel(C, "x", 34);
+    csConf.setQueues(C, new String[] {"c1", "c2"});
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    csConf.setCapacity(A1, 50);
+    csConf.setCapacityByLabel(A1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(A1, "x");
+    
+    final String A2 = A + ".a2";
+    csConf.setCapacity(A2, 50);
+    csConf.setCapacityByLabel(A2, "x", 0);
+    
+    final String B1 = B + ".b1";
+    csConf.setCapacity(B1, 50);
+    csConf.setCapacityByLabel(B1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(B1, "x");
+    
+    final String B2 = B + ".b2";
+    csConf.setCapacity(B2, 50);
+    csConf.setCapacityByLabel(B2, "x", 0);
+    
+    final String C1 = C + ".c1";
+    csConf.setCapacity(C1, 50);
+    csConf.setCapacityByLabel(C1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(C1, "x");
+    
+    final String C2 = C + ".c2";
+    csConf.setCapacity(C2, 50);
+    csConf.setCapacityByLabel(C2, "x", 0);
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    // app2 -> a2
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+    
+    // app3 -> b1
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+    
+    // app4 -> b2
+    RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
+    
+    // app5 -> c1
+    RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
+    MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
+    
+    // app6 -> b2
+    RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
+    MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
+    
+    // Each application request 5 * 1GB container
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    
+    // NM1 do 15 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    for (int i = 0; i < 15; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
+    // containers)
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
+
+    // Check pending resource each application
+    // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
+    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
+    checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
+    checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
+
+    rm1.close();
+  }
+  
+  @Test
+  public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)
+     * </pre>
+     * 
+     * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
+     * no-label.
+     * 
+     * When user-limit-factor=5, submit one application in queue b and request
+     * for infinite containers should be able to use up all cluster resources.
+     */
+    
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+    
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 50);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 50);
+    csConf.setAccessibleNodeLabels(B, new HashSet<String>());
+    csConf.setUserLimitFactor(B, 5);
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+    // app1 -> b
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // Each application request 5 * 1GB container
+    am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
+    
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+    
+    // How much cycles we waited to be allocated when available resource only on
+    // partitioned node
+    int cycleWaited = 0;
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      if (schedulerNode1.getNumContainers() == 0) {
+        cycleWaited++;
+      }
+    }
+    // We will will 10 cycles before get allocated on partitioned node
+    // NM2 can allocate 10 containers totally, exclude already allocated AM
+    // container, we will wait 9 to fulfill non-partitioned node, and need wait
+    // one more cycle before allocating to non-partitioned node 
+    Assert.assertEquals(10, cycleWaited);
+    
+    // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
+    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
+
+    rm1.close();
+  }
+  
+  @Test
+  public void testAMContainerAllocationWillAlwaysBeExclusive()
+      throws Exception {
+    /**
+     * Test case: Submit one application without partition, trying to allocate a
+     * node has partition=x, it should fail to allocate since AM container will
+     * always respect exclusivity for partitions
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    rm1.submitApp(1 * GB, "app", "user", null, "b1");
+   
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    
+    // Heartbeat for many times, app1 should get nothing
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
+        .getNumContainers());
+    
+    rm1.close();
+  }
+  
+  @Test
+  public void
+      testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
+          throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * </pre>
+     * 
+     * a/b can access x, both of them has max-capacity-on-x = 50
+     * 
+     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+     * resource.
+     */
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+        "b" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 50);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 50);
+    csConf.setMaximumCapacityByLabel(A, "x", 50);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 50);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 50);
+    csConf.setMaximumCapacityByLabel(B, "x", 50);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+    // app1 asks for 10 partition= containers
+    am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
+
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(10, schedulerNode1.getNumContainers());
+
+    rm1.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 7da1c97..52d0bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -146,7 +146,7 @@ public class TestParentQueue {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, null);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
@@ -157,7 +157,7 @@ public class TestParentQueue {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
               .assignContainers(eq(clusterResource), eq(node),
-                  any(ResourceLimits.class));
+                  any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -168,7 +168,7 @@ public class TestParentQueue {
         return new CSAssignment(allocatedResource, type);
       }
     }).when(queue).assignContainers(eq(clusterResource), eq(node),
-        any(ResourceLimits.class));
+        any(ResourceLimits.class), any(SchedulingMode.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -228,11 +228,16 @@ public class TestParentQueue {
     LeafQueue a = (LeafQueue)queues.get(A);
     LeafQueue b = (LeafQueue)queues.get(B);
     
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
+    
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -240,12 +245,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -254,12 +259,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -268,12 +273,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 4*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -282,12 +287,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -448,16 +453,27 @@ public class TestParentQueue {
 
     // Start testing
     CSQueue a = queues.get(A);
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b = queues.get(B);
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue c = queues.get(C);
+    c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue d = queues.get(D);
+    d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     CSQueue a1 = queues.get(A1);
+    a1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue a2 = queues.get(A2);
+    a2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     CSQueue b1 = queues.get(B1);
+    b1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b2 = queues.get(B2);
+    b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b3 = queues.get(B3);
+    b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
 
     // Simulate C returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -465,7 +481,7 @@ public class TestParentQueue {
     stubQueueAllocation(c, clusterResource, node_0, 1*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 0*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -478,7 +494,7 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -490,14 +506,14 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -517,16 +533,16 @@ public class TestParentQueue {
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
     root.assignContainers(clusterResource, node_2, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -622,12 +638,16 @@ public class TestParentQueue {
     // Start testing
     LeafQueue a = (LeafQueue)queues.get(A);
     LeafQueue b = (LeafQueue)queues.get(B);
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
     
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -636,12 +656,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
@@ -651,12 +671,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -691,12 +711,19 @@ public class TestParentQueue {
     // Start testing
     LeafQueue b3 = (LeafQueue)queues.get(B3);
     LeafQueue b2 = (LeafQueue)queues.get(B2);
+    b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
+    
+    CSQueue b = queues.get(B);
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     
     // Simulate B3 returning a container on node_0
     stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(b2, 0*GB, clusterResource);
     verifyQueueMetrics(b3, 1*GB, clusterResource);
     
@@ -705,12 +732,12 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -720,12 +747,12 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index e8a8243..47be618 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -266,7 +266,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -278,7 +278,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -290,7 +290,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -305,7 +305,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -321,7 +321,7 @@ public class TestReservations {
 
     // assign reducer to node 2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -338,7 +338,7 @@ public class TestReservations {
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +422,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -434,7 +434,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -446,7 +446,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -461,7 +461,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -477,7 +477,7 @@ public class TestReservations {
 
     // assign reducer to node 2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -494,7 +494,7 @@ public class TestReservations {
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -570,7 +570,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -581,7 +581,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -592,7 +592,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -606,7 +606,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -621,7 +621,7 @@ public class TestReservations {
 
     // could allocate but told need to unreserve first
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -823,7 +823,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -834,7 +834,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -845,7 +845,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -860,15 +860,16 @@ public class TestReservations {
     Resource capability = Resources.createResource(32 * GB, 0);
     boolean res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -881,16 +882,17 @@ public class TestReservations {
     capability = Resources.createResource(5 * GB, 0);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
-                .createResource(5 * GB));
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.createResource(5 * GB),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertTrue(res);
 
     // tell to not check reservations
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -899,15 +901,16 @@ public class TestReservations {
     // in since feature is off
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
-                .createResource(5 * GB));
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.createResource(5 * GB),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
   }
 
@@ -1008,7 +1011,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1019,7 +1022,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1030,7 +1033,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1044,7 +1047,7 @@ public class TestReservations {
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1059,18 +1062,18 @@ public class TestReservations {
     // set limit so subtrace reservations it can continue
     Resource limit = Resources.createResource(12 * GB, 0);
     boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
-        true, null);
+        true, "");
     assertTrue(res);
 
     // tell it not to check for reservations and should fail as already over
     // limit
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
     // should now return false since feature off
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
     assertFalse(res);
   }
 
@@ -1143,7 +1146,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1155,7 +1158,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1167,7 +1170,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1183,7 +1186,7 @@ public class TestReservations {
     // some resource. Even with continous reservation looking, we don't allow 
     // unreserve resource to reserve container.
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(Resources.createResource(10 * GB)));
+        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1199,7 +1202,7 @@ public class TestReservations {
     // used (8G) + required (5G). It will not reserved since it has to unreserve
     // some resource. Unfortunately, there's nothing to unreserve.
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(Resources.createResource(10 * GB)));
+        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1213,7 +1216,7 @@ public class TestReservations {
 
     // let it assign 5G to node_2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1226,7 +1229,7 @@ public class TestReservations {
 
     // reserve 8G node_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1241,7 +1244,7 @@ public class TestReservations {
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 62135b9..84abf4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -160,6 +160,7 @@ public class TestUtils {
     request.setCapability(capability);
     request.setRelaxLocality(relaxLocality);
     request.setPriority(priority);
+    request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
     return request;
   }
   
@@ -273,6 +274,7 @@ public class TestUtils {
     conf.setCapacity(B1, 100);
     conf.setMaximumCapacity(B1, 100);
     conf.setCapacityByLabel(B1, "y", 100);
+    conf.setMaximumApplicationMasterResourcePerQueuePercent(B1, 1f);
 
     final String C1 = C + ".c1";
     conf.setQueues(C, new String[] {"c1"});


[3/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan

Posted by ji...@apache.org.
YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0fefda64
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0fefda64
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0fefda64

Branch: refs/heads/trunk
Commit: 0fefda645bca935b87b6bb8ca63e6f18340d59f5
Parents: b46ee1e
Author: Jian He <ji...@apache.org>
Authored: Tue Apr 14 11:36:37 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Apr 14 11:45:58 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |    3 +
 .../hadoop/yarn/server/utils/BuilderUtils.java  |    1 +
 .../rmapp/attempt/RMAppAttemptImpl.java         |   10 +-
 .../scheduler/AppSchedulingInfo.java            |   25 +-
 .../scheduler/ResourceUsage.java                |    8 +
 .../scheduler/SchedulerApplicationAttempt.java  |   44 +-
 .../scheduler/SchedulerUtils.java               |   87 +-
 .../scheduler/capacity/AbstractCSQueue.java     |  243 +++--
 .../scheduler/capacity/CSQueue.java             |    5 +-
 .../scheduler/capacity/CapacityScheduler.java   |   91 +-
 .../CapacitySchedulerConfiguration.java         |    5 +
 .../scheduler/capacity/LeafQueue.java           |  368 ++++---
 .../scheduler/capacity/ParentQueue.java         |   59 +-
 .../scheduler/capacity/SchedulingMode.java      |   44 +
 .../server/resourcemanager/Application.java     |    4 +
 .../yarn/server/resourcemanager/MockAM.java     |    8 +-
 .../yarn/server/resourcemanager/MockRM.java     |   35 +-
 .../capacity/TestApplicationLimits.java         |    8 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   41 +-
 .../capacity/TestContainerAllocation.java       |  390 +------
 .../scheduler/capacity/TestLeafQueue.java       |  148 +--
 .../TestNodeLabelContainerAllocation.java       | 1027 ++++++++++++++++++
 .../scheduler/capacity/TestParentQueue.java     |  111 +-
 .../scheduler/capacity/TestReservations.java    |  101 +-
 .../scheduler/capacity/TestUtils.java           |    2 +
 25 files changed, 1914 insertions(+), 954 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4961ae0..2933c79 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -72,6 +72,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support 
     for new resource types on the NM. (Sidharta Seethana via junping_du)
 
+    YARN-3361. CapacityScheduler side changes to support non-exclusive node
+    labels. (Wangda Tan via jianhe)
+
   IMPROVEMENTS
 
     YARN-1880. Cleanup TestApplicationClientProtocolOnHA

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 68d4ef9..f2146c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -313,6 +313,7 @@ public class BuilderUtils {
     request.setResourceName(r.getResourceName());
     request.setCapability(r.getCapability());
     request.setNumContainers(r.getNumContainers());
+    request.setNodeLabelExpression(r.getNodeLabelExpression());
     return request;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1be1727..1071831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -146,7 +146,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private ConcurrentMap<NodeId, List<ContainerStatus>>
       finishedContainersSentToAM =
       new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
-  private Container masterContainer;
+  private volatile Container masterContainer;
 
   private float progress = 0;
   private String host = "N/A";
@@ -762,13 +762,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
 
   @Override
   public Container getMasterContainer() {
-    this.readLock.lock();
-
-    try {
-      return this.masterContainer;
-    } finally {
-      this.readLock.unlock();
-    }
+    return this.masterContainer;
   }
 
   @InterfaceAudience.Private

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 5521d47..5604f0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -73,10 +73,11 @@ public class AppSchedulingInfo {
   /* Allocated by scheduler */
   boolean pending = true; // for app metrics
   
+  private ResourceUsage appResourceUsage;
  
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
       String user, Queue queue, ActiveUsersManager activeUsersManager,
-      long epoch) {
+      long epoch, ResourceUsage appResourceUsage) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
@@ -84,6 +85,7 @@ public class AppSchedulingInfo {
     this.user = user;
     this.activeUsersManager = activeUsersManager;
     this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+    this.appResourceUsage = appResourceUsage;
   }
 
   public ApplicationId getApplicationId() {
@@ -191,13 +193,19 @@ public class AppSchedulingInfo {
             lastRequestCapability);
         
         // update queue:
+        Resource increasedResource = Resources.multiply(request.getCapability(),
+            request.getNumContainers());
         queue.incPendingResource(
             request.getNodeLabelExpression(),
-            Resources.multiply(request.getCapability(),
-                request.getNumContainers()));
+            increasedResource);
+        appResourceUsage.incPending(request.getNodeLabelExpression(), increasedResource);
         if (lastRequest != null) {
+          Resource decreasedResource =
+              Resources.multiply(lastRequestCapability, lastRequestContainers);
           queue.decPendingResource(lastRequest.getNodeLabelExpression(),
-              Resources.multiply(lastRequestCapability, lastRequestContainers));
+              decreasedResource);
+          appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
+              decreasedResource);
         }
       }
     }
@@ -385,6 +393,8 @@ public class AppSchedulingInfo {
       checkForDeactivation();
     }
     
+    appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
     queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
         offSwitchRequest.getCapability());
   }
@@ -492,9 +502,10 @@ public class AppSchedulingInfo {
   }
   
   public ResourceRequest cloneResourceRequest(ResourceRequest request) {
-    ResourceRequest newRequest = ResourceRequest.newInstance(
-        request.getPriority(), request.getResourceName(),
-        request.getCapability(), 1, request.getRelaxLocality());
+    ResourceRequest newRequest =
+        ResourceRequest.newInstance(request.getPriority(),
+            request.getResourceName(), request.getCapability(), 1,
+            request.getRelaxLocality(), request.getNodeLabelExpression());
     return newRequest;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 36ee4da..5169b78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 /**
@@ -250,6 +251,10 @@ public class ResourceUsage {
   }
 
   private Resource _get(String label, ResourceType type) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
+    
     try {
       readLock.lock();
       UsageByLabel usage = usages.get(label);
@@ -263,6 +268,9 @@ public class ResourceUsage {
   }
 
   private UsageByLabel getAndAddIfMissing(String label) {
+    if (label == null) {
+      label = RMNodeLabelsManager.NO_LABEL;
+    }
     if (!usages.containsKey(label)) {
       UsageByLabel u = new UsageByLabel(label);
       usages.put(label, u);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5e0bbc7..fccf766 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.base.Preconditions;
@@ -108,14 +110,24 @@ public class SchedulerApplicationAttempt {
   private Set<ContainerId> pendingRelease = null;
 
   /**
-   * Count how many times the application has been given an opportunity
-   * to schedule a task at each priority. Each time the scheduler
-   * asks the application for a task at this priority, it is incremented,
-   * and each time the application successfully schedules a task, it
+   * Count how many times the application has been given an opportunity to
+   * schedule a task at each priority. Each time the scheduler asks the
+   * application for a task at this priority, it is incremented, and each time
+   * the application successfully schedules a task (at rack or node local), it
    * is reset to 0.
    */
   Multiset<Priority> schedulingOpportunities = HashMultiset.create();
   
+  /**
+   * Count how many times the application has been given an opportunity to
+   * schedule a non-partitioned resource request at each priority. Each time the
+   * scheduler asks the application for a task at this priority, it is
+   * incremented, and each time the application successfully schedules a task,
+   * it is reset to 0 when schedule any task at corresponding priority.
+   */
+  Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity =
+      HashMultiset.create();
+  
   // Time of the last container scheduled at the current allowed level
   protected Map<Priority, Long> lastScheduledContainer =
       new HashMap<Priority, Long>();
@@ -132,7 +144,7 @@ public class SchedulerApplicationAttempt {
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager, rmContext.getEpoch());
+            activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
     this.queue = queue;
     this.pendingRelease = new HashSet<ContainerId>();
     this.attemptId = applicationAttemptId;
@@ -489,6 +501,18 @@ public class SchedulerApplicationAttempt {
     return this.appSchedulingInfo.isBlacklisted(resourceName);
   }
 
+  public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
+      Priority priority) {
+    missedNonPartitionedRequestSchedulingOpportunity.add(priority);
+    return missedNonPartitionedRequestSchedulingOpportunity.count(priority);
+  }
+
+  public synchronized void
+      resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) {
+    missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0);
+  }
+
+  
   public synchronized void addSchedulingOpportunity(Priority priority) {
     schedulingOpportunities.setCount(priority,
         schedulingOpportunities.count(priority) + 1);
@@ -518,6 +542,7 @@ public class SchedulerApplicationAttempt {
   public synchronized void resetSchedulingOpportunities(Priority priority) {
     resetSchedulingOpportunities(priority, System.currentTimeMillis());
   }
+
   // used for continuous scheduling
   public synchronized void resetSchedulingOpportunities(Priority priority,
       long currentTimeMs) {
@@ -669,4 +694,13 @@ public class SchedulerApplicationAttempt {
   public Set<String> getBlacklistedNodes() {
     return this.appSchedulingInfo.getBlackListCopy();
   }
+  
+  @Private
+  public boolean hasPendingResourceRequest(ResourceCalculator rc,
+      String nodePartition, Resource cluster,
+      SchedulingMode schedulingMode) {
+    return SchedulerUtils.hasPendingResourceRequest(rc,
+        this.attemptResourceUsage, nodePartition, cluster,
+        schedulingMode);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 248cc08..7a1a528 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -37,11 +37,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
-import com.google.common.collect.Sets;
-
 /**
  * Utilities shared by schedulers. 
  */
@@ -235,9 +234,13 @@ public class SchedulerUtils {
     if (labelExp == null && queueInfo != null
         && ResourceRequest.ANY.equals(resReq.getResourceName())) {
       labelExp = queueInfo.getDefaultNodeLabelExpression();
-      resReq.setNodeLabelExpression(labelExp);
     }
     
+    // If labelExp still equals to null, set it to be NO_LABEL
+    resReq
+        .setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL
+            : labelExp);
+    
     // we don't allow specify label expression other than resourceName=ANY now
     if (!ResourceRequest.ANY.equals(resReq.getResourceName())
         && labelExp != null && !labelExp.trim().isEmpty()) {
@@ -273,25 +276,6 @@ public class SchedulerUtils {
     }
   }
   
-  public static boolean checkQueueAccessToNode(Set<String> queueLabels,
-      Set<String> nodeLabels) {
-    // if queue's label is *, it can access any node
-    if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
-      return true;
-    }
-    // any queue can access to a node without label
-    if (nodeLabels == null || nodeLabels.isEmpty()) {
-      return true;
-    }
-    // a queue can access to a node only if it contains any label of the node
-    if (queueLabels != null
-        && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
-      return true;
-    }
-    // sorry, you cannot access
-    return false;
-  }
-  
   public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
       Set<String> labels) throws IOException {
     if (mgr == null) {
@@ -311,26 +295,6 @@ public class SchedulerUtils {
       }
     }
   }
-  
-  public static boolean checkNodeLabelExpression(Set<String> nodeLabels,
-      String labelExpression) {
-    // empty label expression can only allocate on node with empty labels
-    if (labelExpression == null || labelExpression.trim().isEmpty()) {
-      if (!nodeLabels.isEmpty()) {
-        return false;
-      }
-    }
-
-    if (labelExpression != null) {
-      for (String str : labelExpression.split("&&")) {
-        if (!str.trim().isEmpty()
-            && (nodeLabels == null || !nodeLabels.contains(str.trim()))) {
-          return false;
-        }
-      }
-    }
-    return true;
-  }
 
   public static boolean checkQueueLabelExpression(Set<String> queueLabels,
       String labelExpression) {
@@ -360,4 +324,43 @@ public class SchedulerUtils {
     }
     return null;
   }
+  
+  public static boolean checkResourceRequestMatchingNodePartition(
+      ResourceRequest offswitchResourceRequest, String nodePartition,
+      SchedulingMode schedulingMode) {
+    // We will only look at node label = nodeLabelToLookAt according to
+    // schedulingMode and partition of node.
+    String nodePartitionToLookAt = null;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      nodePartitionToLookAt = nodePartition;
+    } else {
+      nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+    
+    String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression();
+    if (null == askedNodePartition) {
+      askedNodePartition = RMNodeLabelsManager.NO_LABEL;
+    }
+    return askedNodePartition.equals(nodePartitionToLookAt);
+  }
+  
+  private static boolean hasPendingResourceRequest(ResourceCalculator rc,
+      ResourceUsage usage, String partitionToLookAt, Resource cluster) {
+    if (Resources.greaterThan(rc, cluster,
+        usage.getPending(partitionToLookAt), Resources.none())) {
+      return true;
+    }
+    return false;
+  }
+
+  @Private
+  public static boolean hasPendingResourceRequest(ResourceCalculator rc,
+      ResourceUsage usage, String nodePartition, Resource cluster,
+      SchedulingMode schedulingMode) {
+    String partitionToLookAt = nodePartition;
+    if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      partitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+    }
+    return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 42ea089..d95c45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Map;
 import java.util.Set;
 
@@ -38,12 +37,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@@ -56,6 +55,11 @@ import com.google.common.collect.Sets;
 public abstract class AbstractCSQueue implements CSQueue {
   private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
   
+  static final CSAssignment NULL_ASSIGNMENT =
+      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+  
+  static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+  
   CSQueue parent;
   final String queueName;
   volatile int numContainers;
@@ -343,16 +347,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   synchronized void allocateResource(Resource clusterResource, 
-      Resource resource, Set<String> nodeLabels) {
-    
-    // Update usedResources by labels
-    if (nodeLabels == null || nodeLabels.isEmpty()) {
-      queueUsage.incUsed(resource);
-    } else {
-      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        queueUsage.incUsed(label, resource);
-      }
-    }
+      Resource resource, String nodePartition) {
+    queueUsage.incUsed(nodePartition, resource);
 
     ++numContainers;
     CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
@@ -360,15 +356,8 @@ public abstract class AbstractCSQueue implements CSQueue {
   }
   
   protected synchronized void releaseResource(Resource clusterResource,
-      Resource resource, Set<String> nodeLabels) {
-    // Update usedResources by labels
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      queueUsage.decUsed(resource);
-    } else {
-      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
-        queueUsage.decUsed(label, resource);
-      }
-    }
+      Resource resource, String nodePartition) {
+    queueUsage.decUsed(nodePartition, resource);
 
     CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
         clusterResource, minimumAllocation);
@@ -434,103 +423,108 @@ public abstract class AbstractCSQueue implements CSQueue {
                                         parentQ.getPreemptionDisabled());
   }
   
-  private Resource getCurrentLimitResource(String nodeLabel,
-      Resource clusterResource, ResourceLimits currentResourceLimits) {
-    /*
-     * Current limit resource: For labeled resource: limit = queue-max-resource
-     * (TODO, this part need update when we support labeled-limit) For
-     * non-labeled resource: limit = min(queue-max-resource,
-     * limit-set-by-parent)
-     */
-    Resource queueMaxResource =
-        Resources.multiplyAndNormalizeDown(resourceCalculator,
-            labelManager.getResourceByLabel(nodeLabel, clusterResource),
-            queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
-    if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
-      return Resources.min(resourceCalculator, clusterResource,
-          queueMaxResource, currentResourceLimits.getLimit());
+  private Resource getCurrentLimitResource(String nodePartition,
+      Resource clusterResource, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      /*
+       * Current limit resource: For labeled resource: limit = queue-max-resource
+       * (TODO, this part need update when we support labeled-limit) For
+       * non-labeled resource: limit = min(queue-max-resource,
+       * limit-set-by-parent)
+       */
+      Resource queueMaxResource =
+          Resources.multiplyAndNormalizeDown(resourceCalculator,
+              labelManager.getResourceByLabel(nodePartition, clusterResource),
+              queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation);
+      if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+        return Resources.min(resourceCalculator, clusterResource,
+            queueMaxResource, currentResourceLimits.getLimit());
+      }
+      return queueMaxResource;  
+    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      // When we doing non-exclusive resource allocation, maximum capacity of
+      // all queues on this label equals to total resource with the label.
+      return labelManager.getResourceByLabel(nodePartition, clusterResource);
     }
-    return queueMaxResource;
+    
+    return Resources.none();
   }
   
   synchronized boolean canAssignToThisQueue(Resource clusterResource,
-      Set<String> nodeLabels, ResourceLimits currentResourceLimits,
-      Resource nowRequired, Resource resourceCouldBeUnreserved) {
-    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
-    Set<String> labelCanAccess;
-    if (null == nodeLabels || nodeLabels.isEmpty()) {
-      labelCanAccess = new HashSet<String>();
-      // Any queue can always access any node without label
-      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
-    } else {
-      labelCanAccess = new HashSet<String>(
-          accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
-              : Sets.intersection(accessibleLabels, nodeLabels));
-    }
-    
-    for (String label : labelCanAccess) {
-      // New total resource = used + required
-      Resource newTotalResource =
-          Resources.add(queueUsage.getUsed(label), nowRequired);
-
-      Resource currentLimitResource =
-          getCurrentLimitResource(label, clusterResource, currentResourceLimits);
-
-      // if reservation continous looking enabled, check to see if could we
-      // potentially use this node instead of a reserved node if the application
-      // has reserved containers.
-      // TODO, now only consider reservation cases when the node has no label
-      if (this.reservationsContinueLooking
-          && label.equals(RMNodeLabelsManager.NO_LABEL)
-          && Resources.greaterThan(resourceCalculator, clusterResource,
-              resourceCouldBeUnreserved, Resources.none())) {
-        // resource-without-reserved = used - reserved
-        Resource newTotalWithoutReservedResource =
-            Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-        
-        // when total-used-without-reserved-resource < currentLimit, we still
-        // have chance to allocate on this node by unreserving some containers
-        if (Resources.lessThan(resourceCalculator, clusterResource,
-            newTotalWithoutReservedResource, currentLimitResource)) {
-          if (LOG.isDebugEnabled()) {
-            LOG.debug("try to use reserved: " + getQueueName()
-                + " usedResources: " + queueUsage.getUsed()
-                + ", clusterResources: " + clusterResource
-                + ", reservedResources: " + resourceCouldBeUnreserved
-                + ", capacity-without-reserved: "
-                + newTotalWithoutReservedResource + ", maxLimitCapacity: "
-                + currentLimitResource); 
-          }
-          return true;
+      String nodePartition, ResourceLimits currentResourceLimits,
+      Resource nowRequired, Resource resourceCouldBeUnreserved,
+      SchedulingMode schedulingMode) {
+    // New total resource = used + required
+    Resource newTotalResource =
+        Resources.add(queueUsage.getUsed(nodePartition), nowRequired);
+
+    // Get current limited resource: 
+    // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
+    // queues' max capacity.
+    // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
+    // queue's max capacity, queue's max capacity on the partition will be
+    // considered to be 100%. Which is a queue can use all resource in the
+    // partition. 
+    // Doing this because: for non-exclusive allocation, we make sure there's
+    // idle resource on the partition, to avoid wastage, such resource will be
+    // leveraged as much as we can, and preemption policy will reclaim it back
+    // when partitoned-resource-request comes back.  
+    Resource currentLimitResource =
+        getCurrentLimitResource(nodePartition, clusterResource,
+            currentResourceLimits, schedulingMode);
+
+    // if reservation continous looking enabled, check to see if could we
+    // potentially use this node instead of a reserved node if the application
+    // has reserved containers.
+    // TODO, now only consider reservation cases when the node has no label
+    if (this.reservationsContinueLooking
+        && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+        && Resources.greaterThan(resourceCalculator, clusterResource,
+            resourceCouldBeUnreserved, Resources.none())) {
+      // resource-without-reserved = used - reserved
+      Resource newTotalWithoutReservedResource =
+          Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+      // when total-used-without-reserved-resource < currentLimit, we still
+      // have chance to allocate on this node by unreserving some containers
+      if (Resources.lessThan(resourceCalculator, clusterResource,
+          newTotalWithoutReservedResource, currentLimitResource)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("try to use reserved: " + getQueueName()
+              + " usedResources: " + queueUsage.getUsed()
+              + ", clusterResources: " + clusterResource
+              + ", reservedResources: " + resourceCouldBeUnreserved
+              + ", capacity-without-reserved: "
+              + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+              + currentLimitResource);
         }
+        return true;
       }
-      
-      // Otherwise, if any of the label of this node beyond queue limit, we
-      // cannot allocate on this node. Consider a small epsilon here.
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-          newTotalResource, currentLimitResource)) {
-        return false;
-      }
+    }
 
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName()
-            + "Check assign to queue, label=" + label
-            + " usedResources: " + queueUsage.getUsed(label)
-            + " clusterResources: " + clusterResource
-            + " currentUsedCapacity "
-            + Resources.divide(resourceCalculator, clusterResource,
-                queueUsage.getUsed(label),
-                labelManager.getResourceByLabel(label, clusterResource))
-            + " max-capacity: "
-            + queueCapacities.getAbsoluteMaximumCapacity(label)
-            + ")");
-      }
-      return true;
+    // Check if we over current-resource-limit computed.
+    if (Resources.greaterThan(resourceCalculator, clusterResource,
+        newTotalResource, currentLimitResource)) {
+      return false;
     }
-    
-    // Actually, this will not happen, since labelCanAccess will be always
-    // non-empty
-    return false;
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(getQueueName()
+          + "Check assign to queue, nodePartition="
+          + nodePartition
+          + " usedResources: "
+          + queueUsage.getUsed(nodePartition)
+          + " clusterResources: "
+          + clusterResource
+          + " currentUsedCapacity "
+          + Resources.divide(resourceCalculator, clusterResource,
+              queueUsage.getUsed(nodePartition),
+              labelManager.getResourceByLabel(nodePartition, clusterResource))
+          + " max-capacity: "
+          + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+    }
+    return true;
   }
   
   @Override
@@ -556,4 +550,33 @@ public abstract class AbstractCSQueue implements CSQueue {
       parent.decPendingResource(nodeLabel, resourceToDec);
     }
   }
+  
+  /**
+   * Return if the queue has pending resource on given nodePartition and
+   * schedulingMode. 
+   */
+  boolean hasPendingResourceRequest(String nodePartition, 
+      Resource cluster, SchedulingMode schedulingMode) {
+    return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
+        queueUsage, nodePartition, cluster, schedulingMode);
+  }
+  
+  boolean accessibleToPartition(String nodePartition) {
+    // if queue's label is *, it can access any node
+    if (accessibleLabels != null
+        && accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
+      return true;
+    }
+    // any queue can access to a node without label
+    if (nodePartition == null
+        || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+      return true;
+    }
+    // a queue can access to a node only if it contains any label of the node
+    if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
+      return true;
+    }
+    // sorry, you cannot access
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 1a9448a..b06a646 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -190,10 +190,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    * @param clusterResource the resource of the cluster.
    * @param node node on which resources are available
    * @param resourceLimits how much overall resource of this queue can use. 
+   * @param schedulingMode Type of exclusive check when assign container on a 
+   * NodeManager, see {@link SchedulingMode}.
    * @return the assignment
    */
   public CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits);
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode);
   
   /**
    * A container assigned to the queue has completed.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e93c529..cfeee37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -1114,28 +1115,30 @@ public class CapacityScheduler extends
     if (reservedContainer != null) {
       FiCaSchedulerApp reservedApplication =
           getCurrentAttemptForContainer(reservedContainer.getContainerId());
-      
+
       // Try to fulfill the reservation
-      LOG.info("Trying to fulfill reservation for application " + 
-          reservedApplication.getApplicationId() + " on node: " + 
-          node.getNodeID());
-      
-      LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
-      assignment = queue.assignContainers(
+      LOG.info("Trying to fulfill reservation for application "
+          + reservedApplication.getApplicationId() + " on node: "
+          + node.getNodeID());
+
+      LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+      assignment =
+          queue.assignContainers(
               clusterResource,
               node,
               // TODO, now we only consider limits for parent for non-labeled
               // resources, should consider labeled resources as well.
               new ResourceLimits(labelManager.getResourceByLabel(
-                  RMNodeLabelsManager.NO_LABEL, clusterResource)));
+                  RMNodeLabelsManager.NO_LABEL, clusterResource)),
+              SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       if (assignment.isFulfilledReservation()) {
         CSAssignment tmp =
             new CSAssignment(reservedContainer.getReservedResource(),
-              assignment.getType());
+                assignment.getType());
         Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
-          reservedContainer.getReservedResource());
+            reservedContainer.getReservedResource());
         tmp.getAssignmentInformation().addAllocationDetails(
-          reservedContainer.getContainerId(), queue.getQueuePath());
+            reservedContainer.getContainerId(), queue.getQueuePath());
         tmp.getAssignmentInformation().incrAllocations();
         updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
         schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
@@ -1143,16 +1146,13 @@ public class CapacityScheduler extends
 
       RMContainer excessReservation = assignment.getExcessReservation();
       if (excessReservation != null) {
-      Container container = excessReservation.getContainer();
-      queue.completedContainer(
-          clusterResource, assignment.getApplication(), node, 
-          excessReservation, 
-          SchedulerUtils.createAbnormalContainerStatus(
-              container.getId(), 
-              SchedulerUtils.UNRESERVED_CONTAINER), 
-          RMContainerEventType.RELEASED, null, true);
+        Container container = excessReservation.getContainer();
+        queue.completedContainer(clusterResource, assignment.getApplication(),
+            node, excessReservation, SchedulerUtils
+                .createAbnormalContainerStatus(container.getId(),
+                    SchedulerUtils.UNRESERVED_CONTAINER),
+            RMContainerEventType.RELEASED, null, true);
       }
-
     }
 
     // Try to schedule more if there are no reservations to fulfill
@@ -1163,22 +1163,61 @@ public class CapacityScheduler extends
           LOG.debug("Trying to schedule on node: " + node.getNodeName() +
               ", available: " + node.getAvailableResource());
         }
+
         assignment = root.assignContainers(
             clusterResource,
             node,
             // TODO, now we only consider limits for parent for non-labeled
             // resources, should consider labeled resources as well.
             new ResourceLimits(labelManager.getResourceByLabel(
-                RMNodeLabelsManager.NO_LABEL, clusterResource)));
+                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+        if (Resources.greaterThan(calculator, clusterResource,
+            assignment.getResource(), Resources.none())) {
+          updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+          return;
+        }
+        
+        // Only do non-exclusive allocation when node has node-labels.
+        if (StringUtils.equals(node.getPartition(),
+            RMNodeLabelsManager.NO_LABEL)) {
+          return;
+        }
+        
+        // Only do non-exclusive allocation when the node-label supports that
+        try {
+          if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+              node.getPartition())) {
+            return;
+          }
+        } catch (IOException e) {
+          LOG.warn("Exception when trying to get exclusivity of node label="
+              + node.getPartition(), e);
+          return;
+        }
+        
+        // Try to use NON_EXCLUSIVE
+        assignment = root.assignContainers(
+            clusterResource,
+            node,
+            // TODO, now we only consider limits for parent for non-labeled
+            // resources, should consider labeled resources as well.
+            new ResourceLimits(labelManager.getResourceByLabel(
+                RMNodeLabelsManager.NO_LABEL, clusterResource)),
+            SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
         updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+        if (Resources.greaterThan(calculator, clusterResource,
+            assignment.getResource(), Resources.none())) {
+          return;
+        }
       }
     } else {
-      LOG.info("Skipping scheduling since node " + node.getNodeID() + 
-          " is reserved by application " + 
-          node.getReservedContainer().getContainerId().getApplicationAttemptId()
-          );
+      LOG.info("Skipping scheduling since node "
+          + node.getNodeID()
+          + " is reserved by application "
+          + node.getReservedContainer().getContainerId()
+              .getApplicationAttemptId());
     }
-  
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 102e553..4e8d617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -319,6 +319,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
     		getMaximumApplicationMasterResourcePercent());
   }
   
+  public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
+      float percent) {
+    setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
+  }
+  
   public float getNonLabeledQueueCapacity(String queue) {
     float capacity = queue.equals("root") ? 100.0f : getFloat(
         getQueuePrefix(queue) + CAPACITY, UNDEFINED);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 59a016f..8a6a601 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,7 +24,6 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -58,6 +57,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.security.AccessType;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -718,39 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
       ApplicationAttemptId applicationAttemptId) {
     return applicationAttemptMap.get(applicationAttemptId);
   }
-
-  private static final CSAssignment NULL_ASSIGNMENT =
-      new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-  
-  private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-  
-  private static Set<String> getRequestLabelSetByExpression(
-      String labelExpression) {
-    Set<String> labels = new HashSet<String>();
-    if (null == labelExpression) {
-      return labels;
-    }
-    for (String l : labelExpression.split("&&")) {
-      if (l.trim().isEmpty()) {
-        continue;
-      }
-      labels.add(l.trim());
-    }
-    return labels;
-  }
-  
-  private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
-      FiCaSchedulerNode node) {
-    String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
-    if (null == askedNodeLabel) {
-      askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
-    }
-    return askedNodeLabel.equals(node.getPartition());
-  }
   
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+      FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     
     if(LOG.isDebugEnabled()) {
@@ -758,12 +730,6 @@ public class LeafQueue extends AbstractCSQueue {
         + " #applications=" + activeApplications.size());
     }
     
-    // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
-        node.getLabels())) {
-      return NULL_ASSIGNMENT;
-    }
-    
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
@@ -771,8 +737,26 @@ public class LeafQueue extends AbstractCSQueue {
           getApplication(reservedContainer.getApplicationAttemptId());
       synchronized (application) {
         return assignReservedContainer(application, node, reservedContainer,
-            clusterResource);
+            clusterResource, schedulingMode);
+      }
+    }
+    
+    // if our queue cannot access this node, just return
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(node.getPartition())) {
+      return NULL_ASSIGNMENT;
+    }
+    
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!hasPendingResourceRequest(node.getPartition(),
+        clusterResource, schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + node.getPartition());
       }
+      return NULL_ASSIGNMENT;
     }
     
     // Try to assign containers to applications in order
@@ -783,6 +767,17 @@ public class LeafQueue extends AbstractCSQueue {
         + application.getApplicationId());
         application.showRequests();
       }
+      
+      // Check if application needs more resource, skip if it doesn't need more.
+      if (!application.hasPendingResourceRequest(resourceCalculator,
+          node.getPartition(), clusterResource, schedulingMode)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+              + ", because it doesn't need more resource, schedulingMode="
+              + schedulingMode.name() + " node-label=" + node.getPartition());
+        }
+        continue;
+      }
 
       synchronized (application) {
         // Check if this resource is on the blacklist
@@ -806,10 +801,27 @@ public class LeafQueue extends AbstractCSQueue {
             continue;
           }
           
+          // AM container allocation doesn't support non-exclusive allocation to
+          // avoid painful of preempt an AM container
+          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+            RMAppAttempt rmAppAttempt =
+                csContext.getRMContext().getRMApps()
+                    .get(application.getApplicationId()).getCurrentAppAttempt();
+            if (null == rmAppAttempt.getMasterContainer()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip allocating AM container to app_attempt="
+                    + application.getApplicationAttemptId()
+                    + ", don't allow to allocate AM container in non-exclusive mode");
+              }
+              break;
+            }
+          }
+          
           // Is the node-label-expression of this offswitch resource request
           // matches the node's label?
           // If not match, jump to next priority.
-          if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+          if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
+              anyRequest, node.getPartition(), schedulingMode)) {
             continue;
           }
           
@@ -822,10 +834,6 @@ public class LeafQueue extends AbstractCSQueue {
             }
           }
           
-          Set<String> requestedNodeLabels =
-              getRequestLabelSetByExpression(anyRequest
-                  .getNodeLabelExpression());
-
           // Compute user-limit & set headroom
           // Note: We compute both user-limit & headroom with the highest 
           //       priority request as the target. 
@@ -833,27 +841,61 @@ public class LeafQueue extends AbstractCSQueue {
           //       before all higher priority ones are serviced.
           Resource userLimit = 
               computeUserLimitAndSetHeadroom(application, clusterResource, 
-                  required, requestedNodeLabels);          
+                  required, node.getPartition(), schedulingMode);          
           
           // Check queue max-capacity limit
-          if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
-              this.currentResourceLimits, required, application.getCurrentReservation())) {
+          if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+              this.currentResourceLimits, required,
+              application.getCurrentReservation(), schedulingMode)) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
           if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-              application, true, requestedNodeLabels)) {
+              application, true, node.getPartition())) {
             break;
           }
 
           // Inform the application it is about to get a scheduling opportunity
           application.addSchedulingOpportunity(priority);
           
+          // Increase missed-non-partitioned-resource-request-opportunity.
+          // This is to make sure non-partitioned-resource-request will prefer
+          // to be allocated to non-partitioned nodes
+          int missedNonPartitionedRequestSchedulingOpportunity = 0;
+          if (anyRequest.getNodeLabelExpression().equals(
+              RMNodeLabelsManager.NO_LABEL)) {
+            missedNonPartitionedRequestSchedulingOpportunity =
+                application
+                    .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+          }
+          
+          if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+            // Before doing allocation, we need to check scheduling opportunity to
+            // make sure : non-partitioned resource request should be scheduled to
+            // non-partitioned partition first.
+            if (missedNonPartitionedRequestSchedulingOpportunity < scheduler
+                .getNumClusterNodes()) {
+              if (LOG.isDebugEnabled()) {
+                LOG.debug("Skip app_attempt="
+                    + application.getApplicationAttemptId()
+                    + " priority="
+                    + priority
+                    + " because missed-non-partitioned-resource-request"
+                    + " opportunity under requred:"
+                    + " Now=" + missedNonPartitionedRequestSchedulingOpportunity
+                    + " required="
+                    + scheduler.getNumClusterNodes());
+              }
+
+              break;
+            }
+          }
+          
           // Try to schedule
           CSAssignment assignment =  
             assignContainersOnNode(clusterResource, node, application, priority, 
-                null);
+                null, schedulingMode);
 
           // Did the application skip this node?
           if (assignment.getSkipped()) {
@@ -870,9 +912,9 @@ public class LeafQueue extends AbstractCSQueue {
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
             allocateResource(clusterResource, application, assigned,
-                node.getLabels());
+                node.getPartition());
             
-            // Don't reset scheduling opportunities for non-local assignments
+            // Don't reset scheduling opportunities for offswitch assignments
             // otherwise the app will be delayed for each non-local assignment.
             // This helps apps with many off-cluster requests schedule faster.
             if (assignment.getType() != NodeType.OFF_SWITCH) {
@@ -881,6 +923,10 @@ public class LeafQueue extends AbstractCSQueue {
               }
               application.resetSchedulingOpportunities(priority);
             }
+            // Non-exclusive scheduling opportunity is different: we need reset
+            // it every time to make sure non-labeled resource request will be
+            // most likely allocated on non-labeled nodes first. 
+            application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
             
             // Done
             return assignment;
@@ -904,7 +950,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   private synchronized CSAssignment assignReservedContainer(
       FiCaSchedulerApp application, FiCaSchedulerNode node,
-      RMContainer rmContainer, Resource clusterResource) {
+      RMContainer rmContainer, Resource clusterResource,
+      SchedulingMode schedulingMode) {
     // Do we still need this reservation?
     Priority priority = rmContainer.getReservedPriority();
     if (application.getTotalRequiredResources(priority) == 0) {
@@ -915,7 +962,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Try to assign if we have sufficient resources
     CSAssignment tmp =
         assignContainersOnNode(clusterResource, node, application, priority,
-          rmContainer);
+          rmContainer, schedulingMode);
     
     // Doesn't matter... since it's already charged for at time of reservation
     // "re-reservation" is *free*
@@ -929,7 +976,8 @@ public class LeafQueue extends AbstractCSQueue {
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application, Resource required) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-	  computeUserLimit(application, clusterResource, required, user, null));
+        computeUserLimit(application, clusterResource, required, user,
+            RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
   }
   
   private Resource getHeadroom(User user, Resource currentResourceLimit,
@@ -973,7 +1021,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
-      Resource clusterResource, Resource required, Set<String> requestedLabels) {
+      Resource clusterResource, Resource required, String nodePartition,
+      SchedulingMode schedulingMode) {
     String user = application.getUser();
     User queueUser = getUser(user);
 
@@ -981,7 +1030,7 @@ public class LeafQueue extends AbstractCSQueue {
     // TODO, need consider headroom respect labels also
     Resource userLimit =
         computeUserLimit(application, clusterResource, required,
-            queueUser, requestedLabels);
+            queueUser, nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
     
@@ -1010,34 +1059,18 @@ public class LeafQueue extends AbstractCSQueue {
   @Lock(NoLock.class)
   private Resource computeUserLimit(FiCaSchedulerApp application,
       Resource clusterResource, Resource required, User user,
-      Set<String> requestedLabels) {
+      String nodePartition, SchedulingMode schedulingMode) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
     //   with miniscule capacity (< 1 slot) make progress
     // * If we're running over capacity, then its
     //   (usedResources + required) (which extra resources we are allocating)
-    Resource queueCapacity = Resource.newInstance(0, 0);
-    if (requestedLabels != null && !requestedLabels.isEmpty()) {
-      // if we have multiple labels to request, we will choose to use the first
-      // label
-      String firstLabel = requestedLabels.iterator().next();
-      queueCapacity =
-          Resources
-              .max(resourceCalculator, clusterResource, queueCapacity,
-                  Resources.multiplyAndNormalizeUp(resourceCalculator,
-                      labelManager.getResourceByLabel(firstLabel,
-                          clusterResource),
-                      queueCapacities.getAbsoluteCapacity(firstLabel),
-                      minimumAllocation));
-    } else {
-      // else there's no label on request, just to use absolute capacity as
-      // capacity for nodes without label
-      queueCapacity =
-          Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
-                .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
-              queueCapacities.getAbsoluteCapacity(), minimumAllocation);
-    }
+    Resource queueCapacity =
+        Resources.multiplyAndNormalizeUp(resourceCalculator,
+            labelManager.getResourceByLabel(nodePartition, clusterResource),
+            queueCapacities.getAbsoluteCapacity(nodePartition),
+            minimumAllocation);
 
     // Allow progress for queues with miniscule capacity
     queueCapacity =
@@ -1047,33 +1080,56 @@ public class LeafQueue extends AbstractCSQueue {
             required);
 
     Resource currentCapacity =
-        Resources.lessThan(resourceCalculator, clusterResource, 
-            queueUsage.getUsed(), queueCapacity) ?
-            queueCapacity : Resources.add(queueUsage.getUsed(), required);
+        Resources.lessThan(resourceCalculator, clusterResource,
+            queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
+            : Resources.add(queueUsage.getUsed(nodePartition), required);
     
     // Never allow a single user to take more than the 
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
     
-    final int activeUsers = activeUsersManager.getNumActiveUsers();  
-    		
-    Resource limit =
+    final int activeUsers = activeUsersManager.getNumActiveUsers();
+    
+    // User limit resource is determined by:
+    // max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%)
+    Resource userLimitResource = Resources.max(
+        resourceCalculator, clusterResource, 
+        Resources.divideAndCeil(
+            resourceCalculator, currentCapacity, activeUsers),
+        Resources.divideAndCeil(
+            resourceCalculator, 
+            Resources.multiplyAndRoundDown(
+                currentCapacity, userLimit), 
+            100)
+        );
+    
+    // User limit is capped by maxUserLimit
+    // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
+    // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
+    //
+    // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
+    // partition, its guaranteed resource on that partition is 0. And
+    // user-limit-factor computation is based on queue's guaranteed capacity. So
+    // we will not cap user-limit as well as used resource when doing
+    // IGNORE_PARTITION_EXCLUSIVITY allocation.
+    Resource maxUserLimit = Resources.none();
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      maxUserLimit =
+          Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
+    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      maxUserLimit =
+          labelManager.getResourceByLabel(nodePartition, clusterResource);
+    }
+    
+    // Cap final user limit with maxUserLimit
+    userLimitResource =
         Resources.roundUp(
             resourceCalculator, 
             Resources.min(
                 resourceCalculator, clusterResource,   
-                Resources.max(
-                    resourceCalculator, clusterResource, 
-                    Resources.divideAndCeil(
-                        resourceCalculator, currentCapacity, activeUsers),
-                    Resources.divideAndCeil(
-                        resourceCalculator, 
-                        Resources.multiplyAndRoundDown(
-                            currentCapacity, userLimit), 
-                        100)
-                    ), 
-                Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor)
+                  userLimitResource,
+                  maxUserLimit
                 ), 
             minimumAllocation);
 
@@ -1081,11 +1137,11 @@ public class LeafQueue extends AbstractCSQueue {
       String userName = application.getUser();
       LOG.debug("User limit computation for " + userName + 
           " in queue " + getQueueName() +
-          " userLimit=" + userLimit +
+          " userLimitPercent=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
           " consumed: " + user.getUsed() + 
-          " limit: " + limit +
+          " user-limit-resource: " + userLimitResource +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + queueUsage.getUsed() +
           " currentCapacity: " + currentCapacity +
@@ -1093,31 +1149,26 @@ public class LeafQueue extends AbstractCSQueue {
           " clusterCapacity: " + clusterResource
       );
     }
-    user.setUserResourceLimit(limit);
-    return limit;
+    user.setUserResourceLimit(userLimitResource);
+    return userLimitResource;
   }
   
   @Private
   protected synchronized boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
-      boolean checkReservations, Set<String> requestLabels) {
+      boolean checkReservations, String nodePartition) {
     User user = getUser(userName);
-    
-    String label = CommonNodeLabelsManager.NO_LABEL;
-    if (requestLabels != null && !requestLabels.isEmpty()) {
-      label = requestLabels.iterator().next();
-    }
 
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, but it's a > check, not a >= check, so...
     if (Resources
         .greaterThan(resourceCalculator, clusterResource,
-            user.getUsed(label),
+            user.getUsed(nodePartition),
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
       if (this.reservationsContinueLooking && checkReservations
-          && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+          && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
@@ -1136,7 +1187,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName()
             + " will exceed limit - " + " consumed: "
-            + user.getUsed() + " limit: " + limit);
+            + user.getUsed(nodePartition) + " limit: " + limit);
       }
       return false;
     }
@@ -1176,7 +1227,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   private CSAssignment assignContainersOnNode(Resource clusterResource,
       FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer) {
+      RMContainer reservedContainer, SchedulingMode schedulingMode) {
 
     CSAssignment assigned;
 
@@ -1190,7 +1241,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1219,7 +1270,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned = 
           assignRackLocalContainers(clusterResource, rackLocalResourceRequest, 
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
       if (Resources.greaterThan(resourceCalculator, clusterResource,
         assigned.getResource(), Resources.none())) {
 
@@ -1248,7 +1299,7 @@ public class LeafQueue extends AbstractCSQueue {
       assigned =
           assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
             node, application, priority, reservedContainer,
-            allocatedContainer);
+            allocatedContainer, schedulingMode);
 
       // update locality statistics
       if (allocatedContainer.getValue() != null) {
@@ -1314,16 +1365,17 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   protected boolean checkLimitsToReserve(Resource clusterResource,
-      FiCaSchedulerApp application, Resource capability) {
+      FiCaSchedulerApp application, Resource capability, String nodePartition,
+      SchedulingMode schedulingMode) {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
-        clusterResource, capability, null);
+        clusterResource, capability, nodePartition, schedulingMode);
 
     // Check queue max-capacity limit,
     // TODO: Consider reservation on labels
-    if (!canAssignToThisQueue(clusterResource, null,
-        this.currentResourceLimits, capability, Resources.none())) {
+    if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
+        this.currentResourceLimits, capability, Resources.none(), schedulingMode)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1332,7 +1384,7 @@ public class LeafQueue extends AbstractCSQueue {
 
     // Check user limit
     if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
-        application, false, null)) {
+        application, false, nodePartition)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit user limit");
       }
@@ -1345,12 +1397,13 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignNodeLocalContainers(Resource clusterResource,
       ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.NODE_LOCAL, 
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
 
     return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@@ -1359,12 +1412,13 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignRackLocalContainers(Resource clusterResource,
       ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
 
     return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@@ -1373,16 +1427,21 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignOffSwitchContainers(Resource clusterResource,
       ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority,
-      RMContainer reservedContainer, MutableObject allocatedContainer) {
+      RMContainer reservedContainer, MutableObject allocatedContainer,
+      SchedulingMode schedulingMode) {
     if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
         reservedContainer)) {
       return assignContainer(clusterResource, node, application, priority,
           offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
-          allocatedContainer);
+          allocatedContainer, schedulingMode);
     }
     
     return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
   }
+  
+  private int getActualNodeLocalityDelay() {
+    return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay());
+  }
 
   boolean canAssign(FiCaSchedulerApp application, Priority priority, 
       FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
@@ -1417,10 +1476,7 @@ public class LeafQueue extends AbstractCSQueue {
     if (type == NodeType.RACK_LOCAL) {
       // 'Delay' rack-local just a little bit...
       long missedOpportunities = application.getSchedulingOpportunities(priority);
-      return (
-          Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) < 
-          missedOpportunities
-          );
+      return getActualNodeLocalityDelay() < missedOpportunities;
     }
 
     // Check if we need containers on this host
@@ -1460,7 +1516,7 @@ public class LeafQueue extends AbstractCSQueue {
   private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
       FiCaSchedulerApp application, Priority priority, 
       ResourceRequest request, NodeType type, RMContainer rmContainer,
-      MutableObject createdContainer) {
+      MutableObject createdContainer, SchedulingMode schedulingMode) {
     if (LOG.isDebugEnabled()) {
       LOG.debug("assignContainers: node=" + node.getNodeName()
         + " application=" + application.getApplicationId()
@@ -1469,9 +1525,8 @@ public class LeafQueue extends AbstractCSQueue {
     }
     
     // check if the resource request can access the label
-    if (!SchedulerUtils.checkNodeLabelExpression(
-        node.getLabels(),
-        request.getNodeLabelExpression())) {
+    if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+        node.getPartition(), schedulingMode)) {
       // this is a reserved container, but we cannot allocate it now according
       // to label not match. This can be caused by node label changed
       // We should un-reserve this container.
@@ -1576,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue {
           // If we're trying to reserve a container here, not container will be
           // unreserved for reserving the new one. Check limits again before
           // reserve the new container
-          if (!checkLimitsToReserve(clusterResource, 
-              application, capability)) {
+          if (!checkLimitsToReserve(clusterResource,
+              application, capability, node.getPartition(), schedulingMode)) {
             return new CSAssignment(Resources.none(), type);
           }
         }
@@ -1666,7 +1721,7 @@ public class LeafQueue extends AbstractCSQueue {
         // Book-keeping
         if (removed) {
           releaseResource(clusterResource, application,
-              container.getResource(), node.getLabels());
+              container.getResource(), node.getPartition());
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1684,13 +1739,13 @@ public class LeafQueue extends AbstractCSQueue {
 
   synchronized void allocateResource(Resource clusterResource,
       SchedulerApplicationAttempt application, Resource resource,
-      Set<String> nodeLabels) {
-    super.allocateResource(clusterResource, resource, nodeLabels);
+      String nodePartition) {
+    super.allocateResource(clusterResource, resource, nodePartition);
     
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.assignContainer(resource, nodeLabels);
+    user.assignContainer(resource, nodePartition);
     // Note this is a bit unconventional since it gets the object and modifies
     // it here, rather then using set routine
     Resources.subtractFrom(application.getHeadroom(), resource); // headroom
@@ -1707,13 +1762,13 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   synchronized void releaseResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) {
-    super.releaseResource(clusterResource, resource, nodeLabels);
+      FiCaSchedulerApp application, Resource resource, String nodePartition) {
+    super.releaseResource(clusterResource, resource, nodePartition);
     
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.releaseContainer(resource, nodeLabels);
+    user.releaseContainer(resource, nodePartition);
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
       
     LOG.info(getQueueName() + 
@@ -1723,7 +1778,8 @@ public class LeafQueue extends AbstractCSQueue {
   
   private void updateAbsoluteCapacityResource(Resource clusterResource) {
     absoluteCapacityResource =
-        Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
+        Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
+            .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
             queueCapacities.getAbsoluteCapacity(), minimumAllocation);
   }
   
@@ -1769,8 +1825,9 @@ public class LeafQueue extends AbstractCSQueue {
     // Update application properties
     for (FiCaSchedulerApp application : activeApplications) {
       synchronized (application) {
-        computeUserLimitAndSetHeadroom(application, clusterResource, 
-            Resources.none(), null);
+        computeUserLimitAndSetHeadroom(application, clusterResource,
+            Resources.none(), RMNodeLabelsManager.NO_LABEL,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
       }
     }
   }
@@ -1828,25 +1885,12 @@ public class LeafQueue extends AbstractCSQueue {
       }
     }
 
-    public void assignContainer(Resource resource,
-        Set<String> nodeLabels) {
-      if (nodeLabels == null || nodeLabels.isEmpty()) {
-        userResourceUsage.incUsed(resource);
-      } else {
-        for (String label : nodeLabels) {
-          userResourceUsage.incUsed(label, resource);
-        }
-      }
+    public void assignContainer(Resource resource, String nodePartition) {
+      userResourceUsage.incUsed(nodePartition, resource);
     }
 
-    public void releaseContainer(Resource resource, Set<String> nodeLabels) {
-      if (nodeLabels == null || nodeLabels.isEmpty()) {
-        userResourceUsage.decUsed(resource);
-      } else {
-        for (String label : nodeLabels) {
-          userResourceUsage.decUsed(label, resource);
-        }
-      }
+    public void releaseContainer(Resource resource, String nodePartition) {
+      userResourceUsage.decUsed(nodePartition, resource);
     }
 
     public Resource getUserResourceLimit() {
@@ -1869,7 +1913,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1909,7 +1953,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1927,7 +1971,7 @@ public class LeafQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()


[2/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan

Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 882498a..eb64d43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -377,16 +375,29 @@ public class ParentQueue extends AbstractCSQueue {
 
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
-      FiCaSchedulerNode node, ResourceLimits resourceLimits) {
-    CSAssignment assignment = 
-        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-    Set<String> nodeLabels = node.getLabels();
-    
+      FiCaSchedulerNode node, ResourceLimits resourceLimits,
+      SchedulingMode schedulingMode) {
     // if our queue cannot access this node, just return
-    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
-      return assignment;
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+        && !accessibleToPartition(node.getPartition())) {
+      return NULL_ASSIGNMENT;
+    }
+    
+    // Check if this queue need more resource, simply skip allocation if this
+    // queue doesn't need more resources.
+    if (!super.hasPendingResourceRequest(node.getPartition(),
+        clusterResource, schedulingMode)) {
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("Skip this queue=" + getQueuePath()
+            + ", because it doesn't need more resource, schedulingMode="
+            + schedulingMode.name() + " node-partition=" + node.getPartition());
+      }
+      return NULL_ASSIGNMENT;
     }
     
+    CSAssignment assignment = 
+        new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+    
     while (canAssign(clusterResource, node)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
@@ -396,15 +407,17 @@ public class ParentQueue extends AbstractCSQueue {
       // Are we over maximum-capacity for this queue?
       // This will also consider parent's limits and also continuous reservation
       // looking
-      if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
-          minimumAllocation, Resources.createResource(getMetrics()
-              .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+      if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+          resourceLimits, minimumAllocation, Resources.createResource(
+              getMetrics().getReservedMB(), getMetrics()
+                  .getReservedVirtualCores()), schedulingMode)) {
         break;
       }
       
       // Schedule
-      CSAssignment assignedToChild = 
-          assignContainersToChildQueues(clusterResource, node, resourceLimits);
+      CSAssignment assignedToChild =
+          assignContainersToChildQueues(clusterResource, node, resourceLimits,
+              schedulingMode);
       assignment.setType(assignedToChild.getType());
       
       // Done if no child-queue assigned anything
@@ -413,7 +426,7 @@ public class ParentQueue extends AbstractCSQueue {
               assignedToChild.getResource(), Resources.none())) {
         // Track resource utilization for the parent-queue
         super.allocateResource(clusterResource, assignedToChild.getResource(),
-            nodeLabels);
+            node.getPartition());
         
         // Track resource utilization in this pass of the scheduler
         Resources
@@ -510,7 +523,8 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   private synchronized CSAssignment assignContainersToChildQueues(
-      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
+      Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
+      SchedulingMode schedulingMode) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
@@ -523,12 +537,13 @@ public class ParentQueue extends AbstractCSQueue {
         LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
           + " stats: " + childQueue);
       }
-      
+
       // Get ResourceLimits of child queue before assign containers
       ResourceLimits childLimits =
           getResourceLimitsOfChild(childQueue, cluster, limits);
       
-      assignment = childQueue.assignContainers(cluster, node, childLimits);
+      assignment = childQueue.assignContainers(cluster, node, 
+          childLimits, schedulingMode);
       if(LOG.isDebugEnabled()) {
         LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
           " stats: " + childQueue + " --> " + 
@@ -584,7 +599,7 @@ public class ParentQueue extends AbstractCSQueue {
       // Book keeping
       synchronized (this) {
         super.releaseResource(clusterResource, rmContainer.getContainer()
-            .getResource(), node.getLabels());
+            .getResource(), node.getPartition());
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -653,7 +668,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
     }
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -681,7 +696,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node =
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource(), node.getLabels());
+          .getResource(), node.getPartition());
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -701,7 +716,7 @@ public class ParentQueue extends AbstractCSQueue {
           scheduler.getNode(rmContainer.getContainer().getNodeId());
       super.releaseResource(clusterResource,
           rmContainer.getContainer().getResource(),
-          node.getLabels());
+          node.getPartition());
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
new file mode 100644
index 0000000..7e7dc37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+/**
+ * Scheduling modes, see below for detailed explanations 
+ */
+public enum SchedulingMode {
+  /**
+   * <p>
+   * When a node has partition (say partition=x), only application in the queue
+   * can access to partition=x AND requires for partition=x resource can get
+   * chance to allocate on the node.
+   * </p>
+   * 
+   * <p>
+   * When a node has no partition, only application requires non-partitioned
+   * resource can get chance to allocate on the node.
+   * </p>
+   */
+  RESPECT_PARTITION_EXCLUSIVITY,
+  
+  /**
+   * Only used when a node has partition AND the partition isn't an exclusive
+   * partition AND application requires non-partitioned resource.
+   */
+  IGNORE_PARTITION_EXCLUSIVITY
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index 76ede39..9b7eb84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -277,6 +278,9 @@ public class Application {
     } else {
       request.setNumContainers(request.getNumContainers() + 1);
     }
+    if (request.getNodeLabelExpression() == null) {
+      request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+    }
     
     // Note this down for next interaction with ResourceManager
     ask.remove(request);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index f62fdb3..5c107aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -150,8 +150,14 @@ public class MockAM {
   public AllocateResponse allocate(
       String host, int memory, int numContainers,
       List<ContainerId> releases, String labelExpression) throws Exception {
+    return allocate(host, memory, numContainers, 1, releases, labelExpression);
+  }
+  
+  public AllocateResponse allocate(
+      String host, int memory, int numContainers, int priority,
+      List<ContainerId> releases, String labelExpression) throws Exception {
     List<ResourceRequest> reqs =
-        createReq(new String[] { host }, memory, 1, numContainers,
+        createReq(new String[] { host }, memory, priority, numContainers,
             labelExpression);
     return allocate(reqs, releases);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 06c6b32..f2b1d86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 import java.util.Map;
 
@@ -200,10 +202,18 @@ public class MockRM extends ResourceManager {
   
   public boolean waitForState(MockNM nm, ContainerId containerId,
       RMContainerState containerState, int timeoutMillisecs) throws Exception {
+    return waitForState(Arrays.asList(nm), containerId, containerState,
+        timeoutMillisecs);
+  }
+  
+  public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
+      RMContainerState containerState, int timeoutMillisecs) throws Exception {
     RMContainer container = getResourceScheduler().getRMContainer(containerId);
     int timeoutSecs = 0;
     while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
-      nm.nodeHeartbeat(true);
+      for (MockNM nm : nms) {
+        nm.nodeHeartbeat(true);
+      }
       container = getResourceScheduler().getRMContainer(containerId);
       System.out.println("Waiting for container " + containerId + " to be allocated.");
       Thread.sleep(100);
@@ -217,9 +227,11 @@ public class MockRM extends ResourceManager {
         && timeoutSecs++ < timeoutMillisecs / 100) {
       System.out.println("Container : " + containerId + " State is : "
           + container.getState() + " Waiting for state : " + containerState);
-      nm.nodeHeartbeat(true);
+      for (MockNM nm : nms) {
+        nm.nodeHeartbeat(true);
+      }
       Thread.sleep(100);
-      
+
       if (timeoutMillisecs <= timeoutSecs * 100) {
         return false;
       }
@@ -650,11 +662,28 @@ public class MockRM extends ResourceManager {
     am.waitForState(RMAppAttemptState.FINISHED);
     rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
   }
+  
+  @SuppressWarnings("rawtypes")
+  private static void waitForSchedulerAppAttemptAdded(
+      ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
+    int tick = 0;
+    // Wait for at most 5 sec
+    while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
+        .getApplicationAttempt(attemptId) && tick < 50) {
+      Thread.sleep(100);
+      if (tick % 10 == 0) {
+        System.out.println("waiting for SchedulerApplicationAttempt="
+            + attemptId + " added.");
+      }
+      tick++;
+    }
+  }
 
   public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
       throws Exception {
     rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
     RMAppAttempt attempt = app.getCurrentAppAttempt();
+    waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
     System.out.println("Launch AM " + attempt.getAppAttemptId());
     nm.nodeHeartbeat(true);
     MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 1ca5c97..46167ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -612,7 +612,7 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
@@ -632,7 +632,7 @@ public class TestApplicationLimits {
 
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
     
@@ -652,7 +652,7 @@ public class TestApplicationLimits {
     
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -661,7 +661,7 @@ public class TestApplicationLimits {
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource)); // Schedule to compute
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroom, app_0_1.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 23b31fa..970a98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -133,7 +134,7 @@ public class TestChildQueueOrder {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, null);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
@@ -145,7 +146,7 @@ public class TestChildQueueOrder {
           doReturn(new CSAssignment(Resources.none(), type)).
           when(queue)
               .assignContainers(eq(clusterResource), eq(node),
-                  any(ResourceLimits.class));
+                  any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -157,7 +158,7 @@ public class TestChildQueueOrder {
       }
     }).
     when(queue).assignContainers(eq(clusterResource), eq(node), 
-        any(ResourceLimits.class));
+        any(ResourceLimits.class), any(SchedulingMode.class));
     doNothing().when(node).releaseContainer(any(Container.class));
   }
 
@@ -241,6 +242,14 @@ public class TestChildQueueOrder {
     CSQueue b = queues.get(B);
     CSQueue c = queues.get(C);
     CSQueue d = queues.get(D);
+    
+    // Make a/b/c/d has >0 pending resource, so that allocation will continue.
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+        .incPending(Resources.createResource(1 * GB));
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     final String user_0 = "user_0";
 
@@ -275,7 +284,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     for(int i=0; i < 2; i++)
     {
       stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -283,7 +292,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     } 
     for(int i=0; i < 3; i++)
     {
@@ -292,7 +301,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 1*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }  
     for(int i=0; i < 4; i++)
     {
@@ -301,7 +310,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 1*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }    
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -335,7 +344,7 @@ public class TestChildQueueOrder {
       stubQueueAllocation(c, clusterResource, node_0, 0*GB);
       stubQueueAllocation(d, clusterResource, node_0, 0*GB);
       root.assignContainers(clusterResource, node_0, new ResourceLimits(
-          clusterResource));
+          clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     }
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -363,7 +372,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 3*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -390,7 +399,7 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -405,12 +414,14 @@ public class TestChildQueueOrder {
     stubQueueAllocation(c, clusterResource, node_0, 0*GB);
     stubQueueAllocation(d, clusterResource, node_0, 1*GB);
     root.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(d,b);
-    allocationOrder.verify(d).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
-    allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), any(ResourceLimits.class));
+    allocationOrder.verify(d).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
+    allocationOrder.verify(b).assignContainers(eq(clusterResource),
+        any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+        any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 03b8f5c..54ba617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
 
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -51,9 +54,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.junit.Assert;
@@ -327,387 +334,4 @@ public class TestContainerAllocation {
     rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
     MockRM.launchAndRegisterAM(app1, rm1, nm1);
   }
-  
-  private Configuration getConfigurationWithQueueLabels(Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-    
-    // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    conf.setCapacity(A, 10);
-    conf.setMaximumCapacity(A, 15);
-    conf.setAccessibleNodeLabels(A, toSet("x"));
-    conf.setCapacityByLabel(A, "x", 100);
-    
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    conf.setCapacity(B, 20);
-    conf.setAccessibleNodeLabels(B, toSet("y"));
-    conf.setCapacityByLabel(B, "y", 100);
-    
-    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
-    conf.setCapacity(C, 70);
-    conf.setMaximumCapacity(C, 70);
-    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
-    
-    // Define 2nd-level queues
-    final String A1 = A + ".a1";
-    conf.setQueues(A, new String[] {"a1"});
-    conf.setCapacity(A1, 100);
-    conf.setMaximumCapacity(A1, 100);
-    conf.setCapacityByLabel(A1, "x", 100);
-    
-    final String B1 = B + ".b1";
-    conf.setQueues(B, new String[] {"b1"});
-    conf.setCapacity(B1, 100);
-    conf.setMaximumCapacity(B1, 100);
-    conf.setCapacityByLabel(B1, "y", 100);
-
-    final String C1 = C + ".c1";
-    conf.setQueues(C, new String[] {"c1"});
-    conf.setCapacity(C1, 100);
-    conf.setMaximumCapacity(C1, 100);
-    
-    return conf;
-  }
-  
-  private void checkTaskContainersHost(ApplicationAttemptId attemptId,
-      ContainerId containerId, ResourceManager rm, String host) {
-    YarnScheduler scheduler = rm.getRMContext().getScheduler();
-    SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
-
-    Assert.assertTrue(appReport.getLiveContainers().size() > 0);
-    for (RMContainer c : appReport.getLiveContainers()) {
-      if (c.getContainerId().equals(containerId)) {
-        Assert.assertEquals(host, c.getAllocatedNode().getHost());
-      }
-    }
-  }
-  
-  @SuppressWarnings("unchecked")
-  private <E> Set<E> toSet(E... elements) {
-    Set<E> set = Sets.newHashSet(elements);
-    return set;
-  }
-  
-  @Test (timeout = 300000)
-  public void testContainerAllocationWithSingleUserLimits() throws Exception {
-    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
-    mgr.init(conf);
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-    
-    // A has only 10% of x, so it can only allocate one container in label=empty
-    ContainerId containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-    // Cannot allocate 2nd label=empty container
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-
-    // A has default user limit = 100, so it can use all resource in label = x
-    // We can allocate floor(8000 / 1024) = 7 containers
-    for (int id = 3; id <= 8; id++) {
-      containerId =
-          ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
-      am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
-      Assert.assertTrue(rm1.waitForState(nm1, containerId,
-          RMContainerState.ALLOCATED, 10 * 1000));
-    }
-    rm1.close();
-  }
-  
-  @Test(timeout = 300000)
-  public void testContainerAllocateWithComplexLabels() throws Exception {
-    /*
-     * Queue structure:
-     *                      root (*)
-     *                  ________________
-     *                 /                \
-     *               a x(100%), y(50%)   b y(50%), z(100%)
-     *               ________________    ______________
-     *              /                   /              \
-     *             a1 (x,y)         b1(no)              b2(y,z)
-     *               100%                          y = 100%, z = 100%
-     *                           
-     * Node structure:
-     * h1 : x
-     * h2 : y
-     * h3 : y
-     * h4 : z
-     * h5 : NO
-     * 
-     * Total resource:
-     * x: 4G
-     * y: 6G
-     * z: 2G
-     * *: 2G
-     * 
-     * Resource of
-     * a1: x=4G, y=3G, NO=0.2G
-     * b1: NO=0.9G (max=1G)
-     * b2: y=3, z=2G, NO=0.9G (max=1G)
-     * 
-     * Each node can only allocate two containers
-     */
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
-        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
-        NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
-        toSet("z"), NodeId.newInstance("h5", 0),
-        RMNodeLabelsManager.EMPTY_STRING_SET));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 2048);
-    MockNM nm2 = rm1.registerNode("h2:1234", 2048);
-    MockNM nm3 = rm1.registerNode("h3:1234", 2048);
-    MockNM nm4 = rm1.registerNode("h4:1234", 2048);
-    MockNM nm5 = rm1.registerNode("h5:1234", 2048);
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    // request a container (label = y). can be allocated on nm2 
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h5
-    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
-
-    // request a container for AM, will succeed
-    // and now b1's queue capacity will be used, cannot allocate more containers
-    // (Maximum capacity reached)
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm4, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertFalse(rm1.waitForState(nm5, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    
-    // launch an app to queue b2
-    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
-
-    // request a container. try to allocate on nm1 (label = x) and nm3 (label =
-    // y,z). Will successfully allocate on nm3
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-    
-    // try to allocate container (request label = z) on nm4 (label = y,z). 
-    // Will successfully allocate on nm4 only.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
-    Assert.assertTrue(rm1.waitForState(nm4, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h4");
-
-    rm1.close();
-  }
-
-  @Test (timeout = 120000)
-  public void testContainerAllocateWithLabels() throws Exception {
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
-
-    // request a container.
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h1");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h2
-    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
-
-    // request a container.
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-    
-    // launch an app to queue c1 (label = ""), and check all container will
-    // be allocated in h3
-    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
-    // request a container.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-
-    rm1.close();
-  }
-  
-  @Test (timeout = 120000)
-  public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
-    // This test is pretty much similar to testContainerAllocateWithLabel.
-    // Difference is, this test doesn't specify label expression in ResourceRequest,
-    // instead, it uses default queue label expression
-
-    // set node -> label
-    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
-    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
-        NodeId.newInstance("h2", 0), toSet("y")));
-
-    // inject node label manager
-    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
-      @Override
-      public RMNodeLabelsManager createNodeLabelManager() {
-        return mgr;
-      }
-    };
-
-    rm1.getRMContext().setNodeLabelManager(mgr);
-    rm1.start();
-    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
-    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
-    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-    
-    ContainerId containerId;
-
-    // launch an app to queue a1 (label = x), and check all container will
-    // be allocated in h1
-    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
-    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
-    // request a container.
-    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId =
-        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm1, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
-        "h1");
-
-    // launch an app to queue b1 (label = y), and check all container will
-    // be allocated in h2
-    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
-    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
-    // request a container.
-    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
-        "h2");
-    
-    // launch an app to queue c1 (label = ""), and check all container will
-    // be allocated in h3
-    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
-    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
-    // request a container.
-    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
-    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
-    Assert.assertFalse(rm1.waitForState(nm2, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    Assert.assertTrue(rm1.waitForState(nm3, containerId,
-        RMContainerState.ALLOCATED, 10 * 1000));
-    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
-        "h3");
-
-    rm1.close();
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 972cabb..0b5250b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -351,7 +351,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(
         (int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
         a.getMetrics().getAvailableMB());
@@ -487,7 +487,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -498,7 +498,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -507,7 +507,7 @@ public class TestLeafQueue {
     
     // Can't allocate 3rd due to user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +517,7 @@ public class TestLeafQueue {
     // Bump up user-limit-factor, now allocate should work
     a.setUserLimitFactor(10);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -526,7 +526,7 @@ public class TestLeafQueue {
 
     // One more should work, for app_1, due to user-limit-factor
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -537,7 +537,7 @@ public class TestLeafQueue {
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
     a.assignContainers(clusterResource, node_0, new ResourceLimits(
-        clusterResource));
+        clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -653,21 +653,21 @@ public class TestLeafQueue {
 
     // 1 container to user_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
 
     // Again one to user_0 since he hasn't exceeded user limit yet
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
 
     // One more to user_0 since he is the only active user
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -719,10 +719,10 @@ public class TestLeafQueue {
         1, qb.getActiveUsersManager().getNumActiveUsers());
     //get headroom
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     //maxqueue 16G, userlimit 13G, - 4G used = 9G
     assertEquals(9*GB,app_0.getHeadroom().getMemory());
@@ -739,10 +739,10 @@ public class TestLeafQueue {
             u1Priority, recordFactory)));
     qb.submitApplicationAttempt(app_2, user_1);
     qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
 
     assertEquals(8*GB, qb.getUsedResources().getMemory());
     assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
@@ -782,12 +782,12 @@ public class TestLeafQueue {
     qb.submitApplicationAttempt(app_1, user_0);
     qb.submitApplicationAttempt(app_3, user_1);
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, qb.getUsedResources().getMemory());
     //maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
     assertEquals(5*GB, app_3.getHeadroom().getMemory());
@@ -803,13 +803,13 @@ public class TestLeafQueue {
               TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
                       u0Priority, recordFactory)));
     qb.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
         .getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
         .getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
-        null);
+        "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     
     
     //app3 is user1, active from last test case
@@ -876,7 +876,7 @@ public class TestLeafQueue {
                 priority, recordFactory)));
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -893,7 +893,7 @@ public class TestLeafQueue {
             priority, recordFactory)));
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -982,7 +982,7 @@ public class TestLeafQueue {
 
     // 1 container to user_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -993,7 +993,7 @@ public class TestLeafQueue {
 
     // Again one to user_0 since he hasn't exceeded user limit yet
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1010,7 +1010,7 @@ public class TestLeafQueue {
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(3*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1024,7 +1024,7 @@ public class TestLeafQueue {
             priority, recordFactory)));
     assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(0*GB, app_2.getHeadroom().getMemory());   // hit queue max-cap 
   }
 
@@ -1095,7 +1095,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1103,7 +1103,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1111,7 @@ public class TestLeafQueue {
     // Can't allocate 3rd due to user-limit
     a.setUserLimit(25);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1130,7 +1130,7 @@ public class TestLeafQueue {
     // user_0 is at limit inspite of high user-limit-factor
     a.setUserLimitFactor(10);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1140,7 @@ public class TestLeafQueue {
     // Now allocations should goto app_0 since 
     // user_0 is at user-limit not above it
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1151,7 +1151,7 @@ public class TestLeafQueue {
     // Now - no more allocs since we are at max-cap
     a.setMaxCapacity(0.5f);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1163,7 +1163,7 @@ public class TestLeafQueue {
     a.setMaxCapacity(1.0f);
     a.setUserLimitFactor(1);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(7*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1172,7 +1172,7 @@ public class TestLeafQueue {
 
     // Now we should assign to app_3 again since user_2 is under user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory()); 
     assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1272,7 +1272,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1283,7 +1283,7 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1292,7 +1292,7 @@ public class TestLeafQueue {
     
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1309,7 +1309,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1326,7 +1326,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(4*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1394,7 @@ public class TestLeafQueue {
     // Start testing...
 
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1404,7 +1404,7 @@ public class TestLeafQueue {
 
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1418,7 +1418,7 @@ public class TestLeafQueue {
     doReturn(-1).when(a).getNodeLocalityDelay();
     
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(10*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1435,7 +1435,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1504,7 +1504,7 @@ public class TestLeafQueue {
     
     // Only 1 container
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1*GB, a.getUsedResources().getMemory());
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1512,14 +1512,14 @@ public class TestLeafQueue {
     // Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
     // you can get one container more than user-limit
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2*GB, a.getUsedResources().getMemory());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
     
     // Now, reservation should kick in for app_1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(6*GB, a.getUsedResources().getMemory()); 
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1544,7 +1544,7 @@ public class TestLeafQueue {
 
     // Re-reserve
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1554,7 +1554,7 @@ public class TestLeafQueue {
     
     // Try to schedule on node_1 now, should *move* the reservation
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(9*GB, a.getUsedResources().getMemory()); 
     assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1572,7 +1572,7 @@ public class TestLeafQueue {
             ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
         RMContainerEventType.KILL, null, true);
     CSAssignment assignment = a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8*GB, a.getUsedResources().getMemory());
     assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1644,7 +1644,7 @@ public class TestLeafQueue {
     
     // Start with off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1653,7 +1653,7 @@ public class TestLeafQueue {
 
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1662,7 +1662,7 @@ public class TestLeafQueue {
     
     // Another off switch, shouldn't allocate due to delay scheduling
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1672,7 +1672,7 @@ public class TestLeafQueue {
     // Another off switch, now we should allocate 
     // since missedOpportunities=3 and reqdContainers=3
     assignment = a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1681,7 +1681,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_0
     assignment = a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1690,7 +1690,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_1
     assignment = a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1719,14 +1719,14 @@ public class TestLeafQueue {
     
     // Shouldn't assign RACK_LOCAL yet
     assignment = a.assignContainers(clusterResource, node_3,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(1, app_0.getSchedulingOpportunities(priority));
     assertEquals(2, app_0.getTotalRequiredResources(priority));
     assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
 
     // Should assign RACK_LOCAL now
     assignment = a.assignContainers(clusterResource, node_3,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1808,7 +1808,7 @@ public class TestLeafQueue {
     // Start with off switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1821,7 +1821,7 @@ public class TestLeafQueue {
     // Another off-switch, shouldn't allocate P1 due to delay scheduling
     // thus, no P2 either!
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1833,7 +1833,7 @@ public class TestLeafQueue {
 
     // Another off-switch, shouldn't allocate OFF_SWITCH P1
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1845,7 +1845,7 @@ public class TestLeafQueue {
 
     // Now, DATA_LOCAL for P1
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1857,7 +1857,7 @@ public class TestLeafQueue {
 
     // Now, OFF_SWITCH for P2
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1), 
         eq(priority_1), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1934,7 +1934,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_0_1
     a.assignContainers(clusterResource, node_0_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1943,7 +1943,7 @@ public class TestLeafQueue {
     // No allocation on node_1_0 even though it's node/rack local since
     // required(ANY) == 0
     a.assignContainers(clusterResource, node_1_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1960,7 +1960,7 @@ public class TestLeafQueue {
     // No allocation on node_0_1 even though it's node/rack local since
     // required(rack_1) == 0
     a.assignContainers(clusterResource, node_0_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(1, app_0.getSchedulingOpportunities(priority)); 
@@ -1968,7 +1968,7 @@ public class TestLeafQueue {
     
     // NODE_LOCAL - node_1
     a.assignContainers(clusterResource, node_1_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2221,7 +2221,7 @@ public class TestLeafQueue {
     // node_0_1  
     // Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
     a.assignContainers(clusterResource, node_0_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2244,7 +2244,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since RR(rack_1) = relax: false
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2275,7 +2275,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since node_1_1 is blacklisted
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2304,7 +2304,7 @@ public class TestLeafQueue {
     // node_1_1  
     // Shouldn't allocate since rack_1 is blacklisted
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2331,7 +2331,7 @@ public class TestLeafQueue {
 
     // Now, should allocate since RR(rack_1) = relax: true
     a.assignContainers(clusterResource, node_1_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2362,7 +2362,7 @@ public class TestLeafQueue {
     // host_1_1: 7G
 
     a.assignContainers(clusterResource, node_1_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0), 
         any(Priority.class), any(ResourceRequest.class), any(Container.class));
     assertEquals(0, app_0.getSchedulingOpportunities(priority)); 
@@ -2445,7 +2445,7 @@ public class TestLeafQueue {
 
     try {
       a.assignContainers(clusterResource, node_0, 
-          new ResourceLimits(clusterResource));
+          new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     } catch (NullPointerException e) {
       Assert.fail("NPE when allocating container on node but "
           + "forget to set off-switch request should be handled");