You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:35:54 UTC

[26/50] [abbrv] hadoop git commit: YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
new file mode 100644
index 0000000..cf1b26f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -0,0 +1,1027 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestNodeLabelContainerAllocation {
+  private final int GB = 1024;
+
+  private YarnConfiguration conf;
+  
+  RMNodeLabelsManager mgr;
+
+  @Before
+  public void setUp() throws Exception {
+    conf = new YarnConfiguration();
+    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+      ResourceScheduler.class);
+    mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+  }
+  
+  private Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 15);
+    conf.setAccessibleNodeLabels(A, toSet("x"));
+    conf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 20);
+    conf.setAccessibleNodeLabels(B, toSet("y"));
+    conf.setCapacityByLabel(B, "y", 100);
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    conf.setCapacity(C, 70);
+    conf.setMaximumCapacity(C, 70);
+    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setCapacityByLabel(A1, "x", 100);
+    
+    final String B1 = B + ".b1";
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    conf.setMaximumCapacity(B1, 100);
+    conf.setCapacityByLabel(B1, "y", 100);
+
+    final String C1 = C + ".c1";
+    conf.setQueues(C, new String[] {"c1"});
+    conf.setCapacity(C1, 100);
+    conf.setMaximumCapacity(C1, 100);
+    
+    return conf;
+  }
+  
+  private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+      ContainerId containerId, ResourceManager rm, String host) {
+    YarnScheduler scheduler = rm.getRMContext().getScheduler();
+    SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+    Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+    for (RMContainer c : appReport.getLiveContainers()) {
+      if (c.getContainerId().equals(containerId)) {
+        Assert.assertEquals(host, c.getAllocatedNode().getHost());
+      }
+    }
+  }
+  
+  @SuppressWarnings("unchecked")
+  private <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  
+  @Test (timeout = 300000)
+  public void testContainerAllocationWithSingleUserLimits() throws Exception {
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    // A has only 10% of x, so it can only allocate one container in label=empty
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    // Cannot allocate 2nd label=empty container
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+
+    // A has default user limit = 100, so it can use all resource in label = x
+    // We can allocate floor(8000 / 1024) = 7 containers
+    for (int id = 3; id <= 8; id++) {
+      containerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
+      am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+      Assert.assertTrue(rm1.waitForState(nm1, containerId,
+          RMContainerState.ALLOCATED, 10 * 1000));
+    }
+    rm1.close();
+  }
+  
+  @Test(timeout = 300000)
+  public void testContainerAllocateWithComplexLabels() throws Exception {
+    /*
+     * Queue structure:
+     *                      root (*)
+     *                  ________________
+     *                 /                \
+     *               a x(100%), y(50%)   b y(50%), z(100%)
+     *               ________________    ______________
+     *              /                   /              \
+     *             a1 (x,y)         b1(no)              b2(y,z)
+     *               100%                          y = 100%, z = 100%
+     *                           
+     * Node structure:
+     * h1 : x
+     * h2 : y
+     * h3 : y
+     * h4 : z
+     * h5 : NO
+     * 
+     * Total resource:
+     * x: 4G
+     * y: 6G
+     * z: 2G
+     * *: 2G
+     * 
+     * Resource of
+     * a1: x=4G, y=3G, NO=0.2G
+     * b1: NO=0.9G (max=1G)
+     * b2: y=3, z=2G, NO=0.9G (max=1G)
+     * 
+     * Each node can only allocate two containers
+     */
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+        toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
+        NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
+        toSet("z"), NodeId.newInstance("h5", 0),
+        RMNodeLabelsManager.EMPTY_STRING_SET));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+    MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+    MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+    MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+    MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // request a container (label = y). can be allocated on nm2 
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h5
+    RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+    // request a container for AM, will succeed
+    // and now b1's queue capacity will be used, cannot allocate more containers
+    // (Maximum capacity reached)
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm4, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertFalse(rm1.waitForState(nm5, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // launch an app to queue b2
+    RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+    // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+    // y,z). Will successfully allocate on nm3
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+    
+    // try to allocate container (request label = z) on nm4 (label = y,z). 
+    // Will successfully allocate on nm4 only.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
+    Assert.assertTrue(rm1.waitForState(nm4, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h4");
+
+    rm1.close();
+  }
+
+  @Test (timeout = 120000)
+  public void testContainerAllocateWithLabels() throws Exception {
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+    // request a container.
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h2
+    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+    // request a container.
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+    
+    // launch an app to queue c1 (label = ""), and check all container will
+    // be allocated in h3
+    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // request a container.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+
+    rm1.close();
+  }
+  
+  @Test (timeout = 120000)
+  public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+    // This test is pretty much similar to testContainerAllocateWithLabel.
+    // Difference is, this test doesn't specify label expression in ResourceRequest,
+    // instead, it uses default queue label expression
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+        NodeId.newInstance("h2", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+    MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+    
+    ContainerId containerId;
+
+    // launch an app to queue a1 (label = x), and check all container will
+    // be allocated in h1
+    RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    // request a container.
+    am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+        "h1");
+
+    // launch an app to queue b1 (label = y), and check all container will
+    // be allocated in h2
+    RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // request a container.
+    am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+        "h2");
+    
+    // launch an app to queue c1 (label = ""), and check all container will
+    // be allocated in h3
+    RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+    // request a container.
+    am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+    containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+    Assert.assertFalse(rm1.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    Assert.assertTrue(rm1.waitForState(nm3, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+        "h3");
+
+    rm1.close();
+  }
+  
+  private void checkPendingResource(MockRM rm, int priority,
+      ApplicationAttemptId attemptId, int memory) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+    FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
+    ResourceRequest rr =
+        app.getAppSchedulingInfo().getResourceRequest(
+            Priority.newInstance(priority), "*");
+    Assert.assertEquals(memory,
+        rr.getCapability().getMemory() * rr.getNumContainers());
+  }
+  
+  private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
+      int numContainers) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+    SchedulerNode node = cs.getSchedulerNode(nodeId);
+    Assert.assertEquals(numContainers, node.getNumContainers());
+  }
+  
+  @Test
+  public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
+    /**
+     * Test case: Submit two application to a queue (app1 first then app2), app1
+     * asked for no-label, app2 asked for label=x, when node1 has label=x
+     * doing heart beat, app2 will get allocation first, even if app2 submits later
+     * than app1
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes y to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm2
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // launch another app to queue b1 (label = y), AM container should be launched in nm2
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+    // request container and nm1 do heartbeat (nm2 has label=y), note that app1
+    // request non-labeled container, and app2 request labeled container, app2
+    // will get allocated first even if app1 submitted first.  
+    am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
+    am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
+    
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    
+    // Do node heartbeats many times
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+    
+    // App2 will get preference to be allocated on node1, and node1 will be all
+    // used by App2.
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
+    // app1 get nothing in nm1 (partition=y)
+    checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
+    checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
+    // app2 get all resource in nm1 (partition=y)
+    checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
+    checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
+    
+    rm1.close();
+  }
+  
+  private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
+      NodeId nodeId, FiCaSchedulerApp app) {
+    int num = 0;
+    for (RMContainer container : app.getLiveContainers()) {
+      if (container.getAllocatedNode().equals(nodeId)) {
+        num++;
+      }
+    }
+    Assert.assertEquals(expectedNum, num);
+  }
+  
+  @Test
+  public void
+      testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
+          throws Exception {
+    /**
+     * Test case: Submit one application, it asks label="" in priority=1 and
+     * label="x" in priority=2, when a node with label=x heartbeat, priority=2
+     * will get allocation first even if there're pending resource in priority=1
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes y to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+    
+    ContainerId nextContainerId;
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // request containers from am2, priority=1 asks for "" and priority=2 asks
+    // for "y", "y" container should be allocated first
+    nextContainerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
+    am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
+    Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    // Check pending resource for am2, priority=1 doesn't get allocated before
+    // priority=2 allocated
+    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
+    checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
+    
+    rm1.close();
+  }
+  
+  @Test
+  public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
+      throws Exception {
+    /**
+     * Test case: Submit one application, it asks 6 label="" containers, NM1
+     * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
+     * if NM1 has idle resource, containers are all allocated to NM2 since
+     * non-labeled request should get allocation on non-labeled nodes first.
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+    
+    ContainerId nextContainerId;
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
+    // nm2/nm3 do
+    // heartbeat at the same time, check containers are always allocated to nm3.
+    // This is to verify when there's resource available in non-labeled
+    // partition, non-labeled resource should allocate to non-labeled partition
+    // first.
+    am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
+    for (int i = 2; i < 2 + 6; i++) {
+      nextContainerId =
+          ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
+      Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
+          nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
+    }
+    // no more container allocated on nm1
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
+    // all 7 (1 AM container + 6 task container) containers allocated on nm2
+    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);   
+    
+    rm1.close();
+  }
+
+  @Test
+  public void testPreferenceOfQueuesTowardsNodePartitions()
+      throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /   |   \
+     *        a     b    c
+     *       / \   / \  /  \
+     *      a1 a2 b1 b2 c1 c2
+     *     (x)    (x)   (x)
+     * </pre>
+     * 
+     * Only a1, b1, c1 can access label=x, and their default label=x Each each
+     * has one application, asks for 5 containers. NM1 has label=x
+     * 
+     * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
+     * containers with label=x
+     */
+    
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+    
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 33);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 33);
+    csConf.setQueues(A, new String[] {"a1", "a2"});
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 33);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 33);
+    csConf.setQueues(B, new String[] {"b1", "b2"});
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    csConf.setCapacity(C, 34);
+    csConf.setAccessibleNodeLabels(C, toSet("x"));
+    csConf.setCapacityByLabel(C, "x", 34);
+    csConf.setQueues(C, new String[] {"c1", "c2"});
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    csConf.setCapacity(A1, 50);
+    csConf.setCapacityByLabel(A1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(A1, "x");
+    
+    final String A2 = A + ".a2";
+    csConf.setCapacity(A2, 50);
+    csConf.setCapacityByLabel(A2, "x", 0);
+    
+    final String B1 = B + ".b1";
+    csConf.setCapacity(B1, 50);
+    csConf.setCapacityByLabel(B1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(B1, "x");
+    
+    final String B2 = B + ".b2";
+    csConf.setCapacity(B2, 50);
+    csConf.setCapacityByLabel(B2, "x", 0);
+    
+    final String C1 = C + ".c1";
+    csConf.setCapacity(C1, 50);
+    csConf.setCapacityByLabel(C1, "x", 100);
+    csConf.setDefaultNodeLabelExpression(C1, "x");
+    
+    final String C2 = C + ".c2";
+    csConf.setCapacity(C2, 50);
+    csConf.setCapacityByLabel(C2, "x", 0);
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+    // app1 -> a1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+    
+    // app2 -> a2
+    RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+    
+    // app3 -> b1
+    RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+    MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+    
+    // app4 -> b2
+    RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+    MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
+    
+    // app5 -> c1
+    RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
+    MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
+    
+    // app6 -> b2
+    RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
+    MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
+    
+    // Each application request 5 * 1GB container
+    am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+    
+    // NM1 do 15 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    for (int i = 0; i < 15; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
+    // containers)
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
+
+    // Check pending resource each application
+    // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
+    checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
+    checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
+    checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
+    checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
+
+    rm1.close();
+  }
+  
+  @Test
+  public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)
+     * </pre>
+     * 
+     * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
+     * no-label.
+     * 
+     * When user-limit-factor=5, submit one application in queue b and request
+     * for infinite containers should be able to use up all cluster resources.
+     */
+    
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+    
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 50);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 50);
+    csConf.setAccessibleNodeLabels(B, new HashSet<String>());
+    csConf.setUserLimitFactor(B, 5);
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+    // app1 -> b
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+    
+    // Each application request 5 * 1GB container
+    am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
+    
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+    
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+    
+    // How much cycles we waited to be allocated when available resource only on
+    // partitioned node
+    int cycleWaited = 0;
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      if (schedulerNode1.getNumContainers() == 0) {
+        cycleWaited++;
+      }
+    }
+    // We will will 10 cycles before get allocated on partitioned node
+    // NM2 can allocate 10 containers totally, exclude already allocated AM
+    // container, we will wait 9 to fulfill non-partitioned node, and need wait
+    // one more cycle before allocating to non-partitioned node 
+    Assert.assertEquals(10, cycleWaited);
+    
+    // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
+    checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
+    checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
+
+    rm1.close();
+  }
+  
+  @Test
+  public void testAMContainerAllocationWillAlwaysBeExclusive()
+      throws Exception {
+    /**
+     * Test case: Submit one application without partition, trying to allocate a
+     * node has partition=x, it should fail to allocate since AM container will
+     * always respect exclusivity for partitions
+     */
+    
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+
+    // launch an app to queue b1 (label = y), AM container should be launched in nm3
+    rm1.submitApp(1 * GB, "app", "user", null, "b1");
+   
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    
+    // Heartbeat for many times, app1 should get nothing
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
+        .getNumContainers());
+    
+    rm1.close();
+  }
+  
+  @Test
+  public void
+      testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
+          throws Exception {
+    /**
+     * Test case: have a following queue structure:
+     * 
+     * <pre>
+     *            root
+     *         /      \
+     *        a        b
+     *        (x)     (x)
+     * </pre>
+     * 
+     * a/b can access x, both of them has max-capacity-on-x = 50
+     * 
+     * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+     * resource.
+     */
+
+    CapacitySchedulerConfiguration csConf =
+        new CapacitySchedulerConfiguration(this.conf);
+
+    // Define top-level queues
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+        "b" });
+    csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    csConf.setCapacity(A, 50);
+    csConf.setAccessibleNodeLabels(A, toSet("x"));
+    csConf.setCapacityByLabel(A, "x", 50);
+    csConf.setMaximumCapacityByLabel(A, "x", 50);
+
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    csConf.setCapacity(B, 50);
+    csConf.setAccessibleNodeLabels(B, toSet("x"));
+    csConf.setCapacityByLabel(B, "x", 50);
+    csConf.setMaximumCapacityByLabel(B, "x", 50);
+
+    // set node -> label
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+    // Makes x to be non-exclusive node labels
+    mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+    // inject node label manager
+    MockRM rm1 = new MockRM(csConf) {
+      @Override
+      public RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+    MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+    MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+    // app1 -> a
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+    // app1 asks for 10 partition= containers
+    am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
+
+    // NM1 do 50 heartbeats
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+    SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+    for (int i = 0; i < 50; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    }
+    
+    // app1 gets all resource in partition=x
+    Assert.assertEquals(10, schedulerNode1.getNumContainers());
+
+    rm1.close();
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 7da1c97..52d0bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Mockito.doAnswer;
 import static org.mockito.Mockito.doReturn;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -146,7 +146,7 @@ public class TestParentQueue {
         final Resource allocatedResource = Resources.createResource(allocation);
         if (queue instanceof ParentQueue) {
           ((ParentQueue)queue).allocateResource(clusterResource, 
-              allocatedResource, null);
+              allocatedResource, RMNodeLabelsManager.NO_LABEL);
         } else {
           FiCaSchedulerApp app1 = getMockApplication(0, "");
           ((LeafQueue)queue).allocateResource(clusterResource, app1, 
@@ -157,7 +157,7 @@ public class TestParentQueue {
         if (allocation > 0) {
           doReturn(new CSAssignment(Resources.none(), type)).when(queue)
               .assignContainers(eq(clusterResource), eq(node),
-                  any(ResourceLimits.class));
+                  any(ResourceLimits.class), any(SchedulingMode.class));
 
           // Mock the node's resource availability
           Resource available = node.getAvailableResource();
@@ -168,7 +168,7 @@ public class TestParentQueue {
         return new CSAssignment(allocatedResource, type);
       }
     }).when(queue).assignContainers(eq(clusterResource), eq(node),
-        any(ResourceLimits.class));
+        any(ResourceLimits.class), any(SchedulingMode.class));
   }
   
   private float computeQueueAbsoluteUsedCapacity(CSQueue queue, 
@@ -228,11 +228,16 @@ public class TestParentQueue {
     LeafQueue a = (LeafQueue)queues.get(A);
     LeafQueue b = (LeafQueue)queues.get(B);
     
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
+    
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -240,12 +245,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 2*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
 
@@ -254,12 +259,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 1*GB);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -268,12 +273,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
     stubQueueAllocation(b, clusterResource, node_0, 4*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
 
@@ -282,12 +287,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 1*GB);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, b);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 4*GB, clusterResource);
     verifyQueueMetrics(b, 9*GB, clusterResource);
   }
@@ -448,16 +453,27 @@ public class TestParentQueue {
 
     // Start testing
     CSQueue a = queues.get(A);
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b = queues.get(B);
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue c = queues.get(C);
+    c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue d = queues.get(D);
+    d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     CSQueue a1 = queues.get(A1);
+    a1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue a2 = queues.get(A2);
+    a2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
 
     CSQueue b1 = queues.get(B1);
+    b1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b2 = queues.get(B2);
+    b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     CSQueue b3 = queues.get(B3);
+    b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
 
     // Simulate C returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -465,7 +481,7 @@ public class TestParentQueue {
     stubQueueAllocation(c, clusterResource, node_0, 1*GB);
     stubQueueAllocation(d, clusterResource, node_0, 0*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 0*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -478,7 +494,7 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
     stubQueueAllocation(c, clusterResource, node_1, 0*GB);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
     verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -490,14 +506,14 @@ public class TestParentQueue {
     stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
     stubQueueAllocation(c, clusterResource, node_0, 2*GB);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, c, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 1*GB, clusterResource);
     verifyQueueMetrics(b, 6*GB, clusterResource);
     verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -517,16 +533,16 @@ public class TestParentQueue {
     stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
     stubQueueAllocation(c, clusterResource, node_2, 1*GB);
     root.assignContainers(clusterResource, node_2, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(a, a2, a1, b, c);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(c).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 3*GB, clusterResource);
     verifyQueueMetrics(b, 8*GB, clusterResource);
     verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -622,12 +638,16 @@ public class TestParentQueue {
     // Start testing
     LeafQueue a = (LeafQueue)queues.get(A);
     LeafQueue b = (LeafQueue)queues.get(B);
+    a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
     
     // Simulate B returning a container on node_0
     stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(a, 0*GB, clusterResource);
     verifyQueueMetrics(b, 1*GB, clusterResource);
     
@@ -636,12 +656,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(a, b);
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 2*GB, clusterResource);
     
@@ -651,12 +671,12 @@ public class TestParentQueue {
     stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b, a);
     allocationOrder.verify(b).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(a).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(a, 2*GB, clusterResource);
     verifyQueueMetrics(b, 4*GB, clusterResource);
 
@@ -691,12 +711,19 @@ public class TestParentQueue {
     // Start testing
     LeafQueue b3 = (LeafQueue)queues.get(B3);
     LeafQueue b2 = (LeafQueue)queues.get(B2);
+    b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+    queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+    .incPending(Resources.createResource(1 * GB));
+    
+    CSQueue b = queues.get(B);
+    b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
     
     // Simulate B3 returning a container on node_0
     stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     verifyQueueMetrics(b2, 0*GB, clusterResource);
     verifyQueueMetrics(b3, 1*GB, clusterResource);
     
@@ -705,12 +732,12 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_1, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     InOrder allocationOrder = inOrder(b2, b3);
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 2*GB, clusterResource);
     
@@ -720,12 +747,12 @@ public class TestParentQueue {
     stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
     stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
     root.assignContainers(clusterResource, node_0, 
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     allocationOrder = inOrder(b3, b2);
     allocationOrder.verify(b3).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     allocationOrder.verify(b2).assignContainers(eq(clusterResource), 
-        any(FiCaSchedulerNode.class), anyResourceLimits());
+        any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
     verifyQueueMetrics(b2, 1*GB, clusterResource);
     verifyQueueMetrics(b3, 3*GB, clusterResource);
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index e8a8243..47be618 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.DrainDispatcher;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -266,7 +266,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -278,7 +278,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -290,7 +290,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -305,7 +305,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -321,7 +321,7 @@ public class TestReservations {
 
     // assign reducer to node 2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -338,7 +338,7 @@ public class TestReservations {
     // node_1 heartbeat and unreserves from node_0 in order to allocate
     // on node_1
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +422,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -434,7 +434,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -446,7 +446,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -461,7 +461,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -477,7 +477,7 @@ public class TestReservations {
 
     // assign reducer to node 2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -494,7 +494,7 @@ public class TestReservations {
     // node_1 heartbeat and won't unreserve from node_0, potentially stuck
     // if AM doesn't handle
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(18 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -570,7 +570,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -581,7 +581,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -592,7 +592,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -606,7 +606,7 @@ public class TestReservations {
 
     // try to assign reducer (5G on node 0 and should reserve)
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -621,7 +621,7 @@ public class TestReservations {
 
     // could allocate but told need to unreserve first
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -823,7 +823,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -834,7 +834,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -845,7 +845,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -860,15 +860,16 @@ public class TestReservations {
     Resource capability = Resources.createResource(32 * GB, 0);
     boolean res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     // now add in reservations and make sure it continues if config set
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -881,16 +882,17 @@ public class TestReservations {
     capability = Resources.createResource(5 * GB, 0);
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
-                .createResource(5 * GB));
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.createResource(5 * GB),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertTrue(res);
 
     // tell to not check reservations
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -899,15 +901,16 @@ public class TestReservations {
     // in since feature is off
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources.none());
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.none(),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
 
     res =
         a.canAssignToThisQueue(clusterResource,
-            CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
-                clusterResource), capability, Resources
-                .createResource(5 * GB));
+            RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+                clusterResource), capability, Resources.createResource(5 * GB),
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertFalse(res);
   }
 
@@ -1008,7 +1011,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1019,7 +1022,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1030,7 +1033,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1044,7 +1047,7 @@ public class TestReservations {
     // allocate to queue so that the potential new capacity is greater then
     // absoluteMaxCapacity
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1059,18 +1062,18 @@ public class TestReservations {
     // set limit so subtrace reservations it can continue
     Resource limit = Resources.createResource(12 * GB, 0);
     boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
-        true, null);
+        true, "");
     assertTrue(res);
 
     // tell it not to check for reservations and should fail as already over
     // limit
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
     // should now return false since feature off
-    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
     assertFalse(res);
   }
 
@@ -1143,7 +1146,7 @@ public class TestReservations {
     // Start testing...
     // Only AM
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(2 * GB, a.getUsedResources().getMemory());
     assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1155,7 +1158,7 @@ public class TestReservations {
 
     // Only 1 map - simulating reduce
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(5 * GB, a.getUsedResources().getMemory());
     assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1167,7 +1170,7 @@ public class TestReservations {
 
     // Only 1 map to other node - simulating reduce
     a.assignContainers(clusterResource, node_1,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1183,7 +1186,7 @@ public class TestReservations {
     // some resource. Even with continous reservation looking, we don't allow 
     // unreserve resource to reserve container.
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(Resources.createResource(10 * GB)));
+        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1199,7 +1202,7 @@ public class TestReservations {
     // used (8G) + required (5G). It will not reserved since it has to unreserve
     // some resource. Unfortunately, there's nothing to unreserve.
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(Resources.createResource(10 * GB)));
+        new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(8 * GB, a.getUsedResources().getMemory());
     assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1213,7 +1216,7 @@ public class TestReservations {
 
     // let it assign 5G to node_2
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(13 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1226,7 +1229,7 @@ public class TestReservations {
 
     // reserve 8G node_0
     a.assignContainers(clusterResource, node_0,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1241,7 +1244,7 @@ public class TestReservations {
     // continued to try due to having reservation above,
     // but hits queue limits so can't reserve anymore.
     a.assignContainers(clusterResource, node_2,
-        new ResourceLimits(clusterResource));
+        new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
     assertEquals(21 * GB, a.getUsedResources().getMemory());
     assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
     assertEquals(8 * GB, a.getMetrics().getReservedMB());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 62135b9..84abf4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -160,6 +160,7 @@ public class TestUtils {
     request.setCapability(capability);
     request.setRelaxLocality(relaxLocality);
     request.setPriority(priority);
+    request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
     return request;
   }
   
@@ -273,6 +274,7 @@ public class TestUtils {
     conf.setCapacity(B1, 100);
     conf.setMaximumCapacity(B1, 100);
     conf.setCapacityByLabel(B1, "y", 100);
+    conf.setMaximumApplicationMasterResourcePerQueuePercent(B1, 1f);
 
     final String C1 = C + ".c1";
     conf.setQueues(C, new String[] {"c1"});