You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by zj...@apache.org on 2015/04/18 00:35:54 UTC
[26/50] [abbrv] hadoop git commit: YARN-3361. CapacityScheduler side
changes to support non-exclusive node labels. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
new file mode 100644
index 0000000..cf1b26f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -0,0 +1,1027 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestNodeLabelContainerAllocation {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setAccessibleNodeLabels(A, toSet("x"));
+ conf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setAccessibleNodeLabels(B, toSet("y"));
+ conf.setCapacityByLabel(B, "y", 100);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setCapacityByLabel(A1, "x", 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+ conf.setCapacityByLabel(B1, "y", 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+ ContainerId containerId, ResourceManager rm, String host) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+ for (RMContainer c : appReport.getLiveContainers()) {
+ if (c.getContainerId().equals(containerId)) {
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+
+ @Test (timeout = 300000)
+ public void testContainerAllocationWithSingleUserLimits() throws Exception {
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // A has only 10% of x, so it can only allocate one container in label=empty
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Cannot allocate 2nd label=empty container
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // A has default user limit = 100, so it can use all resource in label = x
+ // We can allocate floor(8000 / 1024) = 7 containers
+ for (int id = 3; id <= 8; id++) {
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ }
+ rm1.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testContainerAllocateWithComplexLabels() throws Exception {
+ /*
+ * Queue structure:
+ * root (*)
+ * ________________
+ * / \
+ * a x(100%), y(50%) b y(50%), z(100%)
+ * ________________ ______________
+ * / / \
+ * a1 (x,y) b1(no) b2(y,z)
+ * 100% y = 100%, z = 100%
+ *
+ * Node structure:
+ * h1 : x
+ * h2 : y
+ * h3 : y
+ * h4 : z
+ * h5 : NO
+ *
+ * Total resource:
+ * x: 4G
+ * y: 6G
+ * z: 2G
+ * *: 2G
+ *
+ * Resource of
+ * a1: x=4G, y=3G, NO=0.2G
+ * b1: NO=0.9G (max=1G)
+ * b2: y=3, z=2G, NO=0.9G (max=1G)
+ *
+ * Each node can only allocate two containers
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
+ NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
+ toSet("z"), NodeId.newInstance("h5", 0),
+ RMNodeLabelsManager.EMPTY_STRING_SET));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container (label = y). can be allocated on nm2
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h5
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+ // request a container for AM, will succeed
+ // and now b1's queue capacity will be used, cannot allocate more containers
+ // (Maximum capacity reached)
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // launch an app to queue b2
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+ // y,z). Will successfully allocate on nm3
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ // try to allocate container (request label = z) on nm4 (label = y,z).
+ // Will successfully allocate on nm4 only.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h4");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithLabels() throws Exception {
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+ // This test is pretty much similar to testContainerAllocateWithLabel.
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
+ // instead, it uses default queue label expression
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ private void checkPendingResource(MockRM rm, int priority,
+ ApplicationAttemptId attemptId, int memory) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+ FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
+ ResourceRequest rr =
+ app.getAppSchedulingInfo().getResourceRequest(
+ Priority.newInstance(priority), "*");
+ Assert.assertEquals(memory,
+ rr.getCapability().getMemory() * rr.getNumContainers());
+ }
+
+ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
+ int numContainers) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+ SchedulerNode node = cs.getSchedulerNode(nodeId);
+ Assert.assertEquals(numContainers, node.getNumContainers());
+ }
+
+ @Test
+ public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
+ /**
+ * Test case: Submit two application to a queue (app1 first then app2), app1
+ * asked for no-label, app2 asked for label=x, when node1 has label=x
+ * doing heart beat, app2 will get allocation first, even if app2 submits later
+ * than app1
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes y to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm2
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // launch another app to queue b1 (label = y), AM container should be launched in nm2
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request container and nm1 do heartbeat (nm2 has label=y), note that app1
+ // request non-labeled container, and app2 request labeled container, app2
+ // will get allocated first even if app1 submitted first.
+ am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
+ am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // Do node heartbeats many times
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App2 will get preference to be allocated on node1, and node1 will be all
+ // used by App2.
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
+ // app1 get nothing in nm1 (partition=y)
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
+ checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
+ // app2 get all resource in nm1 (partition=y)
+ checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
+ checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
+
+ rm1.close();
+ }
+
+ private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
+ NodeId nodeId, FiCaSchedulerApp app) {
+ int num = 0;
+ for (RMContainer container : app.getLiveContainers()) {
+ if (container.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ Assert.assertEquals(expectedNum, num);
+ }
+
+ @Test
+ public void
+ testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
+ throws Exception {
+ /**
+ * Test case: Submit one application, it asks label="" in priority=1 and
+ * label="x" in priority=2, when a node with label=x heartbeat, priority=2
+ * will get allocation first even if there're pending resource in priority=1
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes y to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ ContainerId nextContainerId;
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // request containers from am2, priority=1 asks for "" and priority=2 asks
+ // for "y", "y" container should be allocated first
+ nextContainerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
+ am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
+ Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // Check pending resource for am2, priority=1 doesn't get allocated before
+ // priority=2 allocated
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
+ checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
+ throws Exception {
+ /**
+ * Test case: Submit one application, it asks 6 label="" containers, NM1
+ * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
+ * if NM1 has idle resource, containers are all allocated to NM2 since
+ * non-labeled request should get allocation on non-labeled nodes first.
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ ContainerId nextContainerId;
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
+ // nm2/nm3 do
+ // heartbeat at the same time, check containers are always allocated to nm3.
+ // This is to verify when there's resource available in non-labeled
+ // partition, non-labeled resource should allocate to non-labeled partition
+ // first.
+ am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
+ for (int i = 2; i < 2 + 6; i++) {
+ nextContainerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
+ Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
+ nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
+ }
+ // no more container allocated on nm1
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
+ // all 7 (1 AM container + 6 task container) containers allocated on nm2
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testPreferenceOfQueuesTowardsNodePartitions()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / | \
+ * a b c
+ * / \ / \ / \
+ * a1 a2 b1 b2 c1 c2
+ * (x) (x) (x)
+ * </pre>
+ *
+ * Only a1, b1, c1 can access label=x, and their default label=x Each each
+ * has one application, asks for 5 containers. NM1 has label=x
+ *
+ * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
+ * containers with label=x
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 33);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 33);
+ csConf.setQueues(A, new String[] {"a1", "a2"});
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 33);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 33);
+ csConf.setQueues(B, new String[] {"b1", "b2"});
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ csConf.setCapacity(C, 34);
+ csConf.setAccessibleNodeLabels(C, toSet("x"));
+ csConf.setCapacityByLabel(C, "x", 34);
+ csConf.setQueues(C, new String[] {"c1", "c2"});
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ csConf.setCapacity(A1, 50);
+ csConf.setCapacityByLabel(A1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(A1, "x");
+
+ final String A2 = A + ".a2";
+ csConf.setCapacity(A2, 50);
+ csConf.setCapacityByLabel(A2, "x", 0);
+
+ final String B1 = B + ".b1";
+ csConf.setCapacity(B1, 50);
+ csConf.setCapacityByLabel(B1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(B1, "x");
+
+ final String B2 = B + ".b2";
+ csConf.setCapacity(B2, 50);
+ csConf.setCapacityByLabel(B2, "x", 0);
+
+ final String C1 = C + ".c1";
+ csConf.setCapacity(C1, 50);
+ csConf.setCapacityByLabel(C1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(C1, "x");
+
+ final String C2 = C + ".c2";
+ csConf.setCapacity(C2, 50);
+ csConf.setCapacityByLabel(C2, "x", 0);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // app2 -> a2
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // app3 -> b1
+ RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+
+ // app4 -> b2
+ RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+ MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
+
+ // app5 -> c1
+ RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
+ MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
+
+ // app6 -> b2
+ RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
+ MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
+
+ // Each application request 5 * 1GB container
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+
+ // NM1 do 15 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ for (int i = 0; i < 15; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
+ // containers)
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
+
+ // Check pending resource each application
+ // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
+ checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
+ checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * (x)
+ * </pre>
+ *
+ * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
+ * no-label.
+ *
+ * When user-limit-factor=5, submit one application in queue b and request
+ * for infinite containers should be able to use up all cluster resources.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 50);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 50);
+ csConf.setAccessibleNodeLabels(B, new HashSet<String>());
+ csConf.setUserLimitFactor(B, 5);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ // app1 -> b
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // Each application request 5 * 1GB container
+ am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
+
+ // NM1 do 50 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ // How much cycles we waited to be allocated when available resource only on
+ // partitioned node
+ int cycleWaited = 0;
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ if (schedulerNode1.getNumContainers() == 0) {
+ cycleWaited++;
+ }
+ }
+ // We will will 10 cycles before get allocated on partitioned node
+ // NM2 can allocate 10 containers totally, exclude already allocated AM
+ // container, we will wait 9 to fulfill non-partitioned node, and need wait
+ // one more cycle before allocating to non-partitioned node
+ Assert.assertEquals(10, cycleWaited);
+
+ // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testAMContainerAllocationWillAlwaysBeExclusive()
+ throws Exception {
+ /**
+ * Test case: Submit one application without partition, trying to allocate a
+ * node has partition=x, it should fail to allocate since AM container will
+ * always respect exclusivity for partitions
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ rm1.submitApp(1 * GB, "app", "user", null, "b1");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // Heartbeat for many times, app1 should get nothing
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
+ .getNumContainers());
+
+ rm1.close();
+ }
+
+ @Test
+ public void
+ testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * (x) (x)
+ * </pre>
+ *
+ * a/b can access x, both of them has max-capacity-on-x = 50
+ *
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+ * resource.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+ "b" });
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 50);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 50);
+ csConf.setMaximumCapacityByLabel(A, "x", 50);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 50);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 50);
+ csConf.setMaximumCapacityByLabel(B, "x", 50);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ // app1 -> a
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // app1 asks for 10 partition= containers
+ am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
+
+ // NM1 do 50 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(10, schedulerNode1.getNumContainers());
+
+ rm1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 7da1c97..52d0bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -146,7 +146,7 @@ public class TestParentQueue {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, null);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
@@ -157,7 +157,7 @@ public class TestParentQueue {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
.assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@@ -168,7 +168,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).when(queue).assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
}
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
@@ -228,11 +228,16 @@ public class TestParentQueue {
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -240,12 +245,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -254,12 +259,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -268,12 +273,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@@ -282,12 +287,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@@ -448,16 +453,27 @@ public class TestParentQueue {
// Start testing
CSQueue a = queues.get(A);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b = queues.get(B);
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue c = queues.get(C);
+ c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue d = queues.get(D);
+ d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue a1 = queues.get(A1);
+ a1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue a2 = queues.get(A2);
+ a2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b1 = queues.get(B1);
+ b1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b2 = queues.get(B2);
+ b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b3 = queues.get(B3);
+ b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
// Simulate C returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -465,7 +481,7 @@ public class TestParentQueue {
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -478,7 +494,7 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -490,14 +506,14 @@ public class TestParentQueue {
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -517,16 +533,16 @@ public class TestParentQueue {
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -622,12 +638,16 @@ public class TestParentQueue {
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -636,12 +656,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -651,12 +671,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -691,12 +711,19 @@ public class TestParentQueue {
// Start testing
LeafQueue b3 = (LeafQueue)queues.get(B3);
LeafQueue b2 = (LeafQueue)queues.get(B2);
+ b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+
+ CSQueue b = queues.get(B);
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@@ -705,12 +732,12 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@@ -720,12 +747,12 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index e8a8243..47be618 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -266,7 +266,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -278,7 +278,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -290,7 +290,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -305,7 +305,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -321,7 +321,7 @@ public class TestReservations {
// assign reducer to node 2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -338,7 +338,7 @@ public class TestReservations {
// node_1 heartbeat and unreserves from node_0 in order to allocate
// on node_1
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +422,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -434,7 +434,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -446,7 +446,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -461,7 +461,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -477,7 +477,7 @@ public class TestReservations {
// assign reducer to node 2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -494,7 +494,7 @@ public class TestReservations {
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
// if AM doesn't handle
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -570,7 +570,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -581,7 +581,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -592,7 +592,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -606,7 +606,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -621,7 +621,7 @@ public class TestReservations {
// could allocate but told need to unreserve first
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -823,7 +823,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -834,7 +834,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -845,7 +845,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -860,15 +860,16 @@ public class TestReservations {
Resource capability = Resources.createResource(32 * GB, 0);
boolean res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -881,16 +882,17 @@ public class TestReservations {
capability = Resources.createResource(5 * GB, 0);
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources
- .createResource(5 * GB));
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.createResource(5 * GB),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertTrue(res);
// tell to not check reservations
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -899,15 +901,16 @@ public class TestReservations {
// in since feature is off
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources
- .createResource(5 * GB));
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.createResource(5 * GB),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
}
@@ -1008,7 +1011,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1019,7 +1022,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1030,7 +1033,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1044,7 +1047,7 @@ public class TestReservations {
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1059,18 +1062,18 @@ public class TestReservations {
// set limit so subtrace reservations it can continue
Resource limit = Resources.createResource(12 * GB, 0);
boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
- true, null);
+ true, "");
assertTrue(res);
// tell it not to check for reservations and should fail as already over
// limit
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
// should now return false since feature off
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
assertFalse(res);
}
@@ -1143,7 +1146,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1155,7 +1158,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1167,7 +1170,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1183,7 +1186,7 @@ public class TestReservations {
// some resource. Even with continous reservation looking, we don't allow
// unreserve resource to reserve container.
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(Resources.createResource(10 * GB)));
+ new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1199,7 +1202,7 @@ public class TestReservations {
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Unfortunately, there's nothing to unreserve.
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(Resources.createResource(10 * GB)));
+ new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1213,7 +1216,7 @@ public class TestReservations {
// let it assign 5G to node_2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1226,7 +1229,7 @@ public class TestReservations {
// reserve 8G node_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1241,7 +1244,7 @@ public class TestReservations {
// continued to try due to having reservation above,
// but hits queue limits so can't reserve anymore.
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/025787b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 62135b9..84abf4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -160,6 +160,7 @@ public class TestUtils {
request.setCapability(capability);
request.setRelaxLocality(relaxLocality);
request.setPriority(priority);
+ request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
return request;
}
@@ -273,6 +274,7 @@ public class TestUtils {
conf.setCapacity(B1, 100);
conf.setMaximumCapacity(B1, 100);
conf.setCapacityByLabel(B1, "y", 100);
+ conf.setMaximumApplicationMasterResourcePerQueuePercent(B1, 1f);
final String C1 = C + ".c1";
conf.setQueues(C, new String[] {"c1"});