You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/14 20:46:16 UTC
[1/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to
support non-exclusive node labels. Contributed by Wangda Tan
Repository: hadoop
Updated Branches:
refs/heads/trunk b46ee1e7a -> 0fefda645
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
new file mode 100644
index 0000000..cf1b26f
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -0,0 +1,1027 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
+public class TestNodeLabelContainerAllocation {
+ private final int GB = 1024;
+
+ private YarnConfiguration conf;
+
+ RMNodeLabelsManager mgr;
+
+ @Before
+ public void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+ }
+
+ private Configuration getConfigurationWithQueueLabels(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
+
+ // Define top-level queues
+ conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+ conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ conf.setCapacity(A, 10);
+ conf.setMaximumCapacity(A, 15);
+ conf.setAccessibleNodeLabels(A, toSet("x"));
+ conf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ conf.setCapacity(B, 20);
+ conf.setAccessibleNodeLabels(B, toSet("y"));
+ conf.setCapacityByLabel(B, "y", 100);
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ conf.setCapacity(C, 70);
+ conf.setMaximumCapacity(C, 70);
+ conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ conf.setQueues(A, new String[] {"a1"});
+ conf.setCapacity(A1, 100);
+ conf.setMaximumCapacity(A1, 100);
+ conf.setCapacityByLabel(A1, "x", 100);
+
+ final String B1 = B + ".b1";
+ conf.setQueues(B, new String[] {"b1"});
+ conf.setCapacity(B1, 100);
+ conf.setMaximumCapacity(B1, 100);
+ conf.setCapacityByLabel(B1, "y", 100);
+
+ final String C1 = C + ".c1";
+ conf.setQueues(C, new String[] {"c1"});
+ conf.setCapacity(C1, 100);
+ conf.setMaximumCapacity(C1, 100);
+
+ return conf;
+ }
+
+ private void checkTaskContainersHost(ApplicationAttemptId attemptId,
+ ContainerId containerId, ResourceManager rm, String host) {
+ YarnScheduler scheduler = rm.getRMContext().getScheduler();
+ SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
+
+ Assert.assertTrue(appReport.getLiveContainers().size() > 0);
+ for (RMContainer c : appReport.getLiveContainers()) {
+ if (c.getContainerId().equals(containerId)) {
+ Assert.assertEquals(host, c.getAllocatedNode().getHost());
+ }
+ }
+ }
+
+ @SuppressWarnings("unchecked")
+ private <E> Set<E> toSet(E... elements) {
+ Set<E> set = Sets.newHashSet(elements);
+ return set;
+ }
+
+
+ @Test (timeout = 300000)
+ public void testContainerAllocationWithSingleUserLimits() throws Exception {
+ final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+ mgr.init(conf);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // A has only 10% of x, so it can only allocate one container in label=empty
+ ContainerId containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ // Cannot allocate 2nd label=empty container
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // A has default user limit = 100, so it can use all resource in label = x
+ // We can allocate floor(8000 / 1024) = 7 containers
+ for (int id = 3; id <= 8; id++) {
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ }
+ rm1.close();
+ }
+
+ @Test(timeout = 300000)
+ public void testContainerAllocateWithComplexLabels() throws Exception {
+ /*
+ * Queue structure:
+ * root (*)
+ * ________________
+ * / \
+ * a x(100%), y(50%) b y(50%), z(100%)
+ * ________________ ______________
+ * / / \
+ * a1 (x,y) b1(no) b2(y,z)
+ * 100% y = 100%, z = 100%
+ *
+ * Node structure:
+ * h1 : x
+ * h2 : y
+ * h3 : y
+ * h4 : z
+ * h5 : NO
+ *
+ * Total resource:
+ * x: 4G
+ * y: 6G
+ * z: 2G
+ * *: 2G
+ *
+ * Resource of
+ * a1: x=4G, y=3G, NO=0.2G
+ * b1: NO=0.9G (max=1G)
+ * b2: y=3, z=2G, NO=0.9G (max=1G)
+ *
+ * Each node can only allocate two containers
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
+ toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
+ NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
+ toSet("z"), NodeId.newInstance("h5", 0),
+ RMNodeLabelsManager.EMPTY_STRING_SET));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 2048);
+ MockNM nm2 = rm1.registerNode("h2:1234", 2048);
+ MockNM nm3 = rm1.registerNode("h3:1234", 2048);
+ MockNM nm4 = rm1.registerNode("h4:1234", 2048);
+ MockNM nm5 = rm1.registerNode("h5:1234", 2048);
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container (label = y). can be allocated on nm2
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h5
+ RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
+
+ // request a container for AM, will succeed
+ // and now b1's queue capacity will be used, cannot allocate more containers
+ // (Maximum capacity reached)
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertFalse(rm1.waitForState(nm5, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // launch an app to queue b2
+ RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
+
+ // request a container. try to allocate on nm1 (label = x) and nm3 (label =
+ // y,z). Will successfully allocate on nm3
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ // try to allocate container (request label = z) on nm4 (label = y,z).
+ // Will successfully allocate on nm4 only.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
+ Assert.assertTrue(rm1.waitForState(nm4, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h4");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithLabels() throws Exception {
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ @Test (timeout = 120000)
+ public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
+ // This test is pretty much similar to testContainerAllocateWithLabel.
+ // Difference is, this test doesn't specify label expression in ResourceRequest,
+ // instead, it uses default queue label expression
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
+ NodeId.newInstance("h2", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
+ MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
+
+ ContainerId containerId;
+
+ // launch an app to queue a1 (label = x), and check all container will
+ // be allocated in h1
+ RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // request a container.
+ am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm1, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
+ "h1");
+
+ // launch an app to queue b1 (label = y), and check all container will
+ // be allocated in h2
+ RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request a container.
+ am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
+ "h2");
+
+ // launch an app to queue c1 (label = ""), and check all container will
+ // be allocated in h3
+ RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
+
+ // request a container.
+ am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
+ containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
+ Assert.assertFalse(rm1.waitForState(nm2, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ Assert.assertTrue(rm1.waitForState(nm3, containerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+ checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
+ "h3");
+
+ rm1.close();
+ }
+
+ private void checkPendingResource(MockRM rm, int priority,
+ ApplicationAttemptId attemptId, int memory) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+ FiCaSchedulerApp app = cs.getApplicationAttempt(attemptId);
+ ResourceRequest rr =
+ app.getAppSchedulingInfo().getResourceRequest(
+ Priority.newInstance(priority), "*");
+ Assert.assertEquals(memory,
+ rr.getCapability().getMemory() * rr.getNumContainers());
+ }
+
+ private void checkLaunchedContainerNumOnNode(MockRM rm, NodeId nodeId,
+ int numContainers) {
+ CapacityScheduler cs = (CapacityScheduler) rm.getRMContext().getScheduler();
+ SchedulerNode node = cs.getSchedulerNode(nodeId);
+ Assert.assertEquals(numContainers, node.getNumContainers());
+ }
+
+ @Test
+ public void testPreferenceOfNeedyAppsTowardsNodePartitions() throws Exception {
+ /**
+ * Test case: Submit two application to a queue (app1 first then app2), app1
+ * asked for no-label, app2 asked for label=x, when node1 has label=x
+ * doing heart beat, app2 will get allocation first, even if app2 submits later
+ * than app1
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes y to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm2
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // launch another app to queue b1 (label = y), AM container should be launched in nm2
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // request container and nm1 do heartbeat (nm2 has label=y), note that app1
+ // request non-labeled container, and app2 request labeled container, app2
+ // will get allocated first even if app1 submitted first.
+ am1.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>());
+ am2.allocate("*", 1 * GB, 8, new ArrayList<ContainerId>(), "y");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // Do node heartbeats many times
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App2 will get preference to be allocated on node1, and node1 will be all
+ // used by App2.
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(am1.getApplicationAttemptId());
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(am2.getApplicationAttemptId());
+ // app1 get nothing in nm1 (partition=y)
+ checkNumOfContainersInAnAppOnGivenNode(0, nm1.getNodeId(), schedulerApp1);
+ checkNumOfContainersInAnAppOnGivenNode(9, nm2.getNodeId(), schedulerApp1);
+ // app2 get all resource in nm1 (partition=y)
+ checkNumOfContainersInAnAppOnGivenNode(8, nm1.getNodeId(), schedulerApp2);
+ checkNumOfContainersInAnAppOnGivenNode(1, nm2.getNodeId(), schedulerApp2);
+
+ rm1.close();
+ }
+
+ private void checkNumOfContainersInAnAppOnGivenNode(int expectedNum,
+ NodeId nodeId, FiCaSchedulerApp app) {
+ int num = 0;
+ for (RMContainer container : app.getLiveContainers()) {
+ if (container.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ Assert.assertEquals(expectedNum, num);
+ }
+
+ @Test
+ public void
+ testPreferenceOfNeedyPrioritiesUnderSameAppTowardsNodePartitions()
+ throws Exception {
+ /**
+ * Test case: Submit one application, it asks label="" in priority=1 and
+ * label="x" in priority=2, when a node with label=x heartbeat, priority=2
+ * will get allocation first even if there're pending resource in priority=1
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes y to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("y", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("y")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ ContainerId nextContainerId;
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // request containers from am2, priority=1 asks for "" and priority=2 asks
+ // for "y", "y" container should be allocated first
+ nextContainerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+ am1.allocate("*", 1 * GB, 1, 1, new ArrayList<ContainerId>(), "");
+ am1.allocate("*", 1 * GB, 1, 2, new ArrayList<ContainerId>(), "y");
+ Assert.assertTrue(rm1.waitForState(nm1, nextContainerId,
+ RMContainerState.ALLOCATED, 10 * 1000));
+
+ // Check pending resource for am2, priority=1 doesn't get allocated before
+ // priority=2 allocated
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 1 * GB);
+ checkPendingResource(rm1, 2, am1.getApplicationAttemptId(), 0 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testNonLabeledResourceRequestGetPreferrenceToNonLabeledNode()
+ throws Exception {
+ /**
+ * Test case: Submit one application, it asks 6 label="" containers, NM1
+ * with label=y and NM2 has no label, NM1/NM2 doing heartbeat together. Even
+ * if NM1 has idle resource, containers are all allocated to NM2 since
+ * non-labeled request should get allocation on non-labeled nodes first.
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = y
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ ContainerId nextContainerId;
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // request containers from am2, priority=1 asks for "" * 6 (id from 4 to 9),
+ // nm2/nm3 do
+ // heartbeat at the same time, check containers are always allocated to nm3.
+ // This is to verify when there's resource available in non-labeled
+ // partition, non-labeled resource should allocate to non-labeled partition
+ // first.
+ am1.allocate("*", 1 * GB, 6, 1, new ArrayList<ContainerId>(), "");
+ for (int i = 2; i < 2 + 6; i++) {
+ nextContainerId =
+ ContainerId.newContainerId(am1.getApplicationAttemptId(), i);
+ Assert.assertTrue(rm1.waitForState(Arrays.asList(nm1, nm2),
+ nextContainerId, RMContainerState.ALLOCATED, 10 * 1000));
+ }
+ // no more container allocated on nm1
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 0);
+ // all 7 (1 AM container + 6 task container) containers allocated on nm2
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 7);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testPreferenceOfQueuesTowardsNodePartitions()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / | \
+ * a b c
+ * / \ / \ / \
+ * a1 a2 b1 b2 c1 c2
+ * (x) (x) (x)
+ * </pre>
+ *
+ * Only a1, b1, c1 can access label=x, and their default label=x Each each
+ * has one application, asks for 5 containers. NM1 has label=x
+ *
+ * NM1/NM2 doing heartbeat for 15 times, it should allocate all 15
+ * containers with label=x
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 33);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 33);
+ csConf.setQueues(A, new String[] {"a1", "a2"});
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 33);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 33);
+ csConf.setQueues(B, new String[] {"b1", "b2"});
+
+ final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+ csConf.setCapacity(C, 34);
+ csConf.setAccessibleNodeLabels(C, toSet("x"));
+ csConf.setCapacityByLabel(C, "x", 34);
+ csConf.setQueues(C, new String[] {"c1", "c2"});
+
+ // Define 2nd-level queues
+ final String A1 = A + ".a1";
+ csConf.setCapacity(A1, 50);
+ csConf.setCapacityByLabel(A1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(A1, "x");
+
+ final String A2 = A + ".a2";
+ csConf.setCapacity(A2, 50);
+ csConf.setCapacityByLabel(A2, "x", 0);
+
+ final String B1 = B + ".b1";
+ csConf.setCapacity(B1, 50);
+ csConf.setCapacityByLabel(B1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(B1, "x");
+
+ final String B2 = B + ".b2";
+ csConf.setCapacity(B2, 50);
+ csConf.setCapacityByLabel(B2, "x", 0);
+
+ final String C1 = C + ".c1";
+ csConf.setCapacity(C1, 50);
+ csConf.setCapacityByLabel(C1, "x", 100);
+ csConf.setDefaultNodeLabelExpression(C1, "x");
+
+ final String C2 = C + ".c2";
+ csConf.setCapacity(C2, 50);
+ csConf.setCapacityByLabel(C2, "x", 0);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 100 * GB); // label = <empty>
+
+ // app1 -> a1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ // app2 -> a2
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // app3 -> b1
+ RMApp app3 = rm1.submitApp(1 * GB, "app", "user", null, "b1");
+ MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm1);
+
+ // app4 -> b2
+ RMApp app4 = rm1.submitApp(1 * GB, "app", "user", null, "b2");
+ MockAM am4 = MockRM.launchAndRegisterAM(app4, rm1, nm2);
+
+ // app5 -> c1
+ RMApp app5 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
+ MockAM am5 = MockRM.launchAndRegisterAM(app5, rm1, nm1);
+
+ // app6 -> b2
+ RMApp app6 = rm1.submitApp(1 * GB, "app", "user", null, "c2");
+ MockAM am6 = MockRM.launchAndRegisterAM(app6, rm1, nm2);
+
+ // Each application request 5 * 1GB container
+ am1.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am2.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am3.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am4.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am5.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+ am6.allocate("*", 1 * GB, 5, new ArrayList<ContainerId>());
+
+ // NM1 do 15 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ for (int i = 0; i < 15; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // NM1 get 15 new containers (total is 18, 15 task containers and 3 AM
+ // containers)
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 18);
+
+ // Check pending resource each application
+ // APP1/APP3/APP5 get satisfied, and APP2/APP2/APP3 get nothing.
+ checkPendingResource(rm1, 1, am1.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am2.getApplicationAttemptId(), 5 * GB);
+ checkPendingResource(rm1, 1, am3.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am4.getApplicationAttemptId(), 5 * GB);
+ checkPendingResource(rm1, 1, am5.getApplicationAttemptId(), 0 * GB);
+ checkPendingResource(rm1, 1, am6.getApplicationAttemptId(), 5 * GB);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testQueuesWithoutAccessUsingPartitionedNodes() throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * (x)
+ * </pre>
+ *
+ * Only a can access label=x, two nodes in the cluster, n1 has x and n2 has
+ * no-label.
+ *
+ * When user-limit-factor=5, submit one application in queue b and request
+ * for infinite containers should be able to use up all cluster resources.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 50);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 100);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 50);
+ csConf.setAccessibleNodeLabels(B, new HashSet<String>());
+ csConf.setUserLimitFactor(B, 5);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ // app1 -> b
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // Each application request 5 * 1GB container
+ am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
+
+ // NM1 do 50 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ // How much cycles we waited to be allocated when available resource only on
+ // partitioned node
+ int cycleWaited = 0;
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ if (schedulerNode1.getNumContainers() == 0) {
+ cycleWaited++;
+ }
+ }
+ // We will will 10 cycles before get allocated on partitioned node
+ // NM2 can allocate 10 containers totally, exclude already allocated AM
+ // container, we will wait 9 to fulfill non-partitioned node, and need wait
+ // one more cycle before allocating to non-partitioned node
+ Assert.assertEquals(10, cycleWaited);
+
+ // Both NM1/NM2 launched 10 containers, cluster resource is exhausted
+ checkLaunchedContainerNumOnNode(rm1, nm1.getNodeId(), 10);
+ checkLaunchedContainerNumOnNode(rm1, nm2.getNodeId(), 10);
+
+ rm1.close();
+ }
+
+ @Test
+ public void testAMContainerAllocationWillAlwaysBeExclusive()
+ throws Exception {
+ /**
+ * Test case: Submit one application without partition, trying to allocate a
+ * node has partition=x, it should fail to allocate since AM container will
+ * always respect exclusivity for partitions
+ */
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(TestUtils.getConfigurationWithQueueLabels(conf)) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); // label = x
+
+ // launch an app to queue b1 (label = y), AM container should be launched in nm3
+ rm1.submitApp(1 * GB, "app", "user", null, "b1");
+
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // Heartbeat for many times, app1 should get nothing
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ Assert.assertEquals(0, cs.getSchedulerNode(nm1.getNodeId())
+ .getNumContainers());
+
+ rm1.close();
+ }
+
+ @Test
+ public void
+ testQueueMaxCapacitiesWillNotBeHonoredWhenNotRespectingExclusivity()
+ throws Exception {
+ /**
+ * Test case: have a following queue structure:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * (x) (x)
+ * </pre>
+ *
+ * a/b can access x, both of them has max-capacity-on-x = 50
+ *
+ * When doing non-exclusive allocation, app in a (or b) can use 100% of x
+ * resource.
+ */
+
+ CapacitySchedulerConfiguration csConf =
+ new CapacitySchedulerConfiguration(this.conf);
+
+ // Define top-level queues
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a",
+ "b" });
+ csConf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+
+ final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+ csConf.setCapacity(A, 50);
+ csConf.setAccessibleNodeLabels(A, toSet("x"));
+ csConf.setCapacityByLabel(A, "x", 50);
+ csConf.setMaximumCapacityByLabel(A, "x", 50);
+
+ final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+ csConf.setCapacity(B, 50);
+ csConf.setAccessibleNodeLabels(B, toSet("x"));
+ csConf.setCapacityByLabel(B, "x", 50);
+ csConf.setMaximumCapacityByLabel(B, "x", 50);
+
+ // set node -> label
+ mgr.addToCluserNodeLabels(ImmutableSet.of("x"));
+ // Makes x to be non-exclusive node labels
+ mgr.updateNodeLabels(Arrays.asList(NodeLabel.newInstance("x", false)));
+ mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+
+ // inject node label manager
+ MockRM rm1 = new MockRM(csConf) {
+ @Override
+ public RMNodeLabelsManager createNodeLabelManager() {
+ return mgr;
+ }
+ };
+
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB); // label = x
+ MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB); // label = <empty>
+
+ // app1 -> a
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
+
+ // app1 asks for 10 partition= containers
+ am1.allocate("*", 1 * GB, 10, new ArrayList<ContainerId>());
+
+ // NM1 do 50 heartbeats
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+
+ for (int i = 0; i < 50; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // app1 gets all resource in partition=x
+ Assert.assertEquals(10, schedulerNode1.getNumContainers());
+
+ rm1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index 7da1c97..52d0bc1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -23,7 +23,6 @@ import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -45,6 +44,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -146,7 +146,7 @@ public class TestParentQueue {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, null);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
@@ -157,7 +157,7 @@ public class TestParentQueue {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).when(queue)
.assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@@ -168,7 +168,7 @@ public class TestParentQueue {
return new CSAssignment(allocatedResource, type);
}
}).when(queue).assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
}
private float computeQueueAbsoluteUsedCapacity(CSQueue queue,
@@ -228,11 +228,16 @@ public class TestParentQueue {
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -240,12 +245,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 2*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -254,12 +259,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 1*GB);
stubQueueAllocation(b, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -268,12 +273,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
stubQueueAllocation(b, clusterResource, node_0, 4*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
@@ -282,12 +287,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 1*GB);
stubQueueAllocation(b, clusterResource, node_1, 1*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, b);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 4*GB, clusterResource);
verifyQueueMetrics(b, 9*GB, clusterResource);
}
@@ -448,16 +453,27 @@ public class TestParentQueue {
// Start testing
CSQueue a = queues.get(A);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b = queues.get(B);
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue c = queues.get(C);
+ c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue d = queues.get(D);
+ d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue a1 = queues.get(A1);
+ a1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue a2 = queues.get(A2);
+ a2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b1 = queues.get(B1);
+ b1.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b2 = queues.get(B2);
+ b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
CSQueue b3 = queues.get(B3);
+ b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
// Simulate C returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -465,7 +481,7 @@ public class TestParentQueue {
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 0*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -478,7 +494,7 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_1, 4*GB);
stubQueueAllocation(c, clusterResource, node_1, 0*GB);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
verifyQueueMetrics(c, 1*GB, clusterResource);
@@ -490,14 +506,14 @@ public class TestParentQueue {
stubQueueAllocation(b3, clusterResource, node_0, 2*GB);
stubQueueAllocation(c, clusterResource, node_0, 2*GB);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, c, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 6*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -517,16 +533,16 @@ public class TestParentQueue {
stubQueueAllocation(b1, clusterResource, node_2, 1*GB);
stubQueueAllocation(c, clusterResource, node_2, 1*GB);
root.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(a, a2, a1, b, c);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(c).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 8*GB, clusterResource);
verifyQueueMetrics(c, 4*GB, clusterResource);
@@ -622,12 +638,16 @@ public class TestParentQueue {
// Start testing
LeafQueue a = (LeafQueue)queues.get(A);
LeafQueue b = (LeafQueue)queues.get(B);
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
// Simulate B returning a container on node_0
stubQueueAllocation(a, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 0*GB, clusterResource);
verifyQueueMetrics(b, 1*GB, clusterResource);
@@ -636,12 +656,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_1, 2*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(a, b);
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -651,12 +671,12 @@ public class TestParentQueue {
stubQueueAllocation(a, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b, clusterResource, node_0, 2*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b, a);
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(a).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 4*GB, clusterResource);
@@ -691,12 +711,19 @@ public class TestParentQueue {
// Start testing
LeafQueue b3 = (LeafQueue)queues.get(B3);
LeafQueue b2 = (LeafQueue)queues.get(B2);
+ b2.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b3.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+
+ CSQueue b = queues.get(B);
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
// Simulate B3 returning a container on node_0
stubQueueAllocation(b2, clusterResource, node_0, 0*GB, NodeType.OFF_SWITCH);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(b2, 0*GB, clusterResource);
verifyQueueMetrics(b3, 1*GB, clusterResource);
@@ -705,12 +732,12 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_1, 1*GB, NodeType.RACK_LOCAL);
stubQueueAllocation(b3, clusterResource, node_1, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(b2, b3);
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 2*GB, clusterResource);
@@ -720,12 +747,12 @@ public class TestParentQueue {
stubQueueAllocation(b2, clusterResource, node_0, 1*GB, NodeType.NODE_LOCAL);
stubQueueAllocation(b3, clusterResource, node_0, 1*GB, NodeType.OFF_SWITCH);
root.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
allocationOrder = inOrder(b3, b2);
allocationOrder.verify(b3).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
allocationOrder.verify(b2).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyResourceLimits());
+ any(FiCaSchedulerNode.class), anyResourceLimits(), any(SchedulingMode.class));
verifyQueueMetrics(b2, 1*GB, clusterResource);
verifyQueueMetrics(b3, 3*GB, clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index e8a8243..47be618 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -48,10 +48,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -266,7 +266,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -278,7 +278,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -290,7 +290,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -305,7 +305,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -321,7 +321,7 @@ public class TestReservations {
// assign reducer to node 2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -338,7 +338,7 @@ public class TestReservations {
// node_1 heartbeat and unreserves from node_0 in order to allocate
// on node_1
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(18 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -422,7 +422,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -434,7 +434,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -446,7 +446,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -461,7 +461,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -477,7 +477,7 @@ public class TestReservations {
// assign reducer to node 2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -494,7 +494,7 @@ public class TestReservations {
// node_1 heartbeat and won't unreserve from node_0, potentially stuck
// if AM doesn't handle
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(18 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -570,7 +570,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -581,7 +581,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -592,7 +592,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -606,7 +606,7 @@ public class TestReservations {
// try to assign reducer (5G on node 0 and should reserve)
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -621,7 +621,7 @@ public class TestReservations {
// could allocate but told need to unreserve first
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -823,7 +823,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -834,7 +834,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -845,7 +845,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -860,15 +860,16 @@ public class TestReservations {
Resource capability = Resources.createResource(32 * GB, 0);
boolean res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, a.getMetrics().getReservedMB());
@@ -881,16 +882,17 @@ public class TestReservations {
capability = Resources.createResource(5 * GB, 0);
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources
- .createResource(5 * GB));
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.createResource(5 * GB),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertTrue(res);
// tell to not check reservations
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
@@ -899,15 +901,16 @@ public class TestReservations {
// in since feature is off
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources.none());
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.none(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
res =
a.canAssignToThisQueue(clusterResource,
- CommonNodeLabelsManager.EMPTY_STRING_SET, new ResourceLimits(
- clusterResource), capability, Resources
- .createResource(5 * GB));
+ RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
+ clusterResource), capability, Resources.createResource(5 * GB),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
}
@@ -1008,7 +1011,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1019,7 +1022,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1030,7 +1033,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1044,7 +1047,7 @@ public class TestReservations {
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(5 * GB, app_0.getCurrentReservation().getMemory());
@@ -1059,18 +1062,18 @@ public class TestReservations {
// set limit so subtrace reservations it can continue
Resource limit = Resources.createResource(12 * GB, 0);
boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
- true, null);
+ true, "");
assertTrue(res);
// tell it not to check for reservations and should fail as already over
// limit
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
assertFalse(res);
refreshQueuesTurnOffReservationsContLook(a, csConf);
// should now return false since feature off
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
assertFalse(res);
}
@@ -1143,7 +1146,7 @@ public class TestReservations {
// Start testing...
// Only AM
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2 * GB, a.getUsedResources().getMemory());
assertEquals(2 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1155,7 +1158,7 @@ public class TestReservations {
// Only 1 map - simulating reduce
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5 * GB, a.getUsedResources().getMemory());
assertEquals(5 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1167,7 +1170,7 @@ public class TestReservations {
// Only 1 map to other node - simulating reduce
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1183,7 +1186,7 @@ public class TestReservations {
// some resource. Even with continous reservation looking, we don't allow
// unreserve resource to reserve container.
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(Resources.createResource(10 * GB)));
+ new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1199,7 +1202,7 @@ public class TestReservations {
// used (8G) + required (5G). It will not reserved since it has to unreserve
// some resource. Unfortunately, there's nothing to unreserve.
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(Resources.createResource(10 * GB)));
+ new ResourceLimits(Resources.createResource(10 * GB)), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8 * GB, a.getUsedResources().getMemory());
assertEquals(8 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1213,7 +1216,7 @@ public class TestReservations {
// let it assign 5G to node_2
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(13 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0 * GB, a.getMetrics().getReservedMB());
@@ -1226,7 +1229,7 @@ public class TestReservations {
// reserve 8G node_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
@@ -1241,7 +1244,7 @@ public class TestReservations {
// continued to try due to having reservation above,
// but hits queue limits so can't reserve anymore.
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(21 * GB, a.getUsedResources().getMemory());
assertEquals(13 * GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8 * GB, a.getMetrics().getReservedMB());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 62135b9..84abf4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -160,6 +160,7 @@ public class TestUtils {
request.setCapability(capability);
request.setRelaxLocality(relaxLocality);
request.setPriority(priority);
+ request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
return request;
}
@@ -273,6 +274,7 @@ public class TestUtils {
conf.setCapacity(B1, 100);
conf.setMaximumCapacity(B1, 100);
conf.setCapacityByLabel(B1, "y", 100);
+ conf.setMaximumApplicationMasterResourcePerQueuePercent(B1, 1f);
final String C1 = C + ".c1";
conf.setQueues(C, new String[] {"c1"});
[3/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to
support non-exclusive node labels. Contributed by Wangda Tan
Posted by ji...@apache.org.
YARN-3361. CapacityScheduler side changes to support non-exclusive node labels. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/0fefda64
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/0fefda64
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/0fefda64
Branch: refs/heads/trunk
Commit: 0fefda645bca935b87b6bb8ca63e6f18340d59f5
Parents: b46ee1e
Author: Jian He <ji...@apache.org>
Authored: Tue Apr 14 11:36:37 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Apr 14 11:45:58 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../hadoop/yarn/server/utils/BuilderUtils.java | 1 +
.../rmapp/attempt/RMAppAttemptImpl.java | 10 +-
.../scheduler/AppSchedulingInfo.java | 25 +-
.../scheduler/ResourceUsage.java | 8 +
.../scheduler/SchedulerApplicationAttempt.java | 44 +-
.../scheduler/SchedulerUtils.java | 87 +-
.../scheduler/capacity/AbstractCSQueue.java | 243 +++--
.../scheduler/capacity/CSQueue.java | 5 +-
.../scheduler/capacity/CapacityScheduler.java | 91 +-
.../CapacitySchedulerConfiguration.java | 5 +
.../scheduler/capacity/LeafQueue.java | 368 ++++---
.../scheduler/capacity/ParentQueue.java | 59 +-
.../scheduler/capacity/SchedulingMode.java | 44 +
.../server/resourcemanager/Application.java | 4 +
.../yarn/server/resourcemanager/MockAM.java | 8 +-
.../yarn/server/resourcemanager/MockRM.java | 35 +-
.../capacity/TestApplicationLimits.java | 8 +-
.../scheduler/capacity/TestChildQueueOrder.java | 41 +-
.../capacity/TestContainerAllocation.java | 390 +------
.../scheduler/capacity/TestLeafQueue.java | 148 +--
.../TestNodeLabelContainerAllocation.java | 1027 ++++++++++++++++++
.../scheduler/capacity/TestParentQueue.java | 111 +-
.../scheduler/capacity/TestReservations.java | 101 +-
.../scheduler/capacity/TestUtils.java | 2 +
25 files changed, 1914 insertions(+), 954 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 4961ae0..2933c79 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -72,6 +72,9 @@ Release 2.8.0 - UNRELEASED
YARN-3443. Create a 'ResourceHandler' subsystem to ease addition of support
for new resource types on the NM. (Sidharta Seethana via junping_du)
+ YARN-3361. CapacityScheduler side changes to support non-exclusive node
+ labels. (Wangda Tan via jianhe)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
index 68d4ef9..f2146c8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/utils/BuilderUtils.java
@@ -313,6 +313,7 @@ public class BuilderUtils {
request.setResourceName(r.getResourceName());
request.setCapability(r.getCapability());
request.setNumContainers(r.getNumContainers());
+ request.setNodeLabelExpression(r.getNodeLabelExpression());
return request;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index 1be1727..1071831 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -146,7 +146,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
private ConcurrentMap<NodeId, List<ContainerStatus>>
finishedContainersSentToAM =
new ConcurrentHashMap<NodeId, List<ContainerStatus>>();
- private Container masterContainer;
+ private volatile Container masterContainer;
private float progress = 0;
private String host = "N/A";
@@ -762,13 +762,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
@Override
public Container getMasterContainer() {
- this.readLock.lock();
-
- try {
- return this.masterContainer;
- } finally {
- this.readLock.unlock();
- }
+ return this.masterContainer;
}
@InterfaceAudience.Private
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 5521d47..5604f0f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -73,10 +73,11 @@ public class AppSchedulingInfo {
/* Allocated by scheduler */
boolean pending = true; // for app metrics
+ private ResourceUsage appResourceUsage;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
- long epoch) {
+ long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
@@ -84,6 +85,7 @@ public class AppSchedulingInfo {
this.user = user;
this.activeUsersManager = activeUsersManager;
this.containerIdCounter = new AtomicLong(epoch << EPOCH_BIT_SHIFT);
+ this.appResourceUsage = appResourceUsage;
}
public ApplicationId getApplicationId() {
@@ -191,13 +193,19 @@ public class AppSchedulingInfo {
lastRequestCapability);
// update queue:
+ Resource increasedResource = Resources.multiply(request.getCapability(),
+ request.getNumContainers());
queue.incPendingResource(
request.getNodeLabelExpression(),
- Resources.multiply(request.getCapability(),
- request.getNumContainers()));
+ increasedResource);
+ appResourceUsage.incPending(request.getNodeLabelExpression(), increasedResource);
if (lastRequest != null) {
+ Resource decreasedResource =
+ Resources.multiply(lastRequestCapability, lastRequestContainers);
queue.decPendingResource(lastRequest.getNodeLabelExpression(),
- Resources.multiply(lastRequestCapability, lastRequestContainers));
+ decreasedResource);
+ appResourceUsage.decPending(lastRequest.getNodeLabelExpression(),
+ decreasedResource);
}
}
}
@@ -385,6 +393,8 @@ public class AppSchedulingInfo {
checkForDeactivation();
}
+ appResourceUsage.decPending(offSwitchRequest.getNodeLabelExpression(),
+ offSwitchRequest.getCapability());
queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
offSwitchRequest.getCapability());
}
@@ -492,9 +502,10 @@ public class AppSchedulingInfo {
}
public ResourceRequest cloneResourceRequest(ResourceRequest request) {
- ResourceRequest newRequest = ResourceRequest.newInstance(
- request.getPriority(), request.getResourceName(),
- request.getCapability(), 1, request.getRelaxLocality());
+ ResourceRequest newRequest =
+ ResourceRequest.newInstance(request.getPriority(),
+ request.getResourceName(), request.getCapability(), 1,
+ request.getRelaxLocality(), request.getNodeLabelExpression());
return newRequest;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index 36ee4da..5169b78 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -27,6 +27,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
@@ -250,6 +251,10 @@ public class ResourceUsage {
}
private Resource _get(String label, ResourceType type) {
+ if (label == null) {
+ label = RMNodeLabelsManager.NO_LABEL;
+ }
+
try {
readLock.lock();
UsageByLabel usage = usages.get(label);
@@ -263,6 +268,9 @@ public class ResourceUsage {
}
private UsageByLabel getAndAddIfMissing(String label) {
+ if (label == null) {
+ label = RMNodeLabelsManager.NO_LABEL;
+ }
if (!usages.containsKey(label)) {
UsageByLabel u = new UsageByLabel(label);
usages.put(label, u);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 5e0bbc7..fccf766 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -56,6 +56,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.base.Preconditions;
@@ -108,14 +110,24 @@ public class SchedulerApplicationAttempt {
private Set<ContainerId> pendingRelease = null;
/**
- * Count how many times the application has been given an opportunity
- * to schedule a task at each priority. Each time the scheduler
- * asks the application for a task at this priority, it is incremented,
- * and each time the application successfully schedules a task, it
+ * Count how many times the application has been given an opportunity to
+ * schedule a task at each priority. Each time the scheduler asks the
+ * application for a task at this priority, it is incremented, and each time
+ * the application successfully schedules a task (at rack or node local), it
* is reset to 0.
*/
Multiset<Priority> schedulingOpportunities = HashMultiset.create();
+ /**
+ * Count how many times the application has been given an opportunity to
+ * schedule a non-partitioned resource request at each priority. Each time the
+ * scheduler asks the application for a task at this priority, it is
+ * incremented, and each time the application successfully schedules a task,
+ * it is reset to 0 when schedule any task at corresponding priority.
+ */
+ Multiset<Priority> missedNonPartitionedRequestSchedulingOpportunity =
+ HashMultiset.create();
+
// Time of the last container scheduled at the current allowed level
protected Map<Priority, Long> lastScheduledContainer =
new HashMap<Priority, Long>();
@@ -132,7 +144,7 @@ public class SchedulerApplicationAttempt {
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager, rmContext.getEpoch());
+ activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
this.pendingRelease = new HashSet<ContainerId>();
this.attemptId = applicationAttemptId;
@@ -489,6 +501,18 @@ public class SchedulerApplicationAttempt {
return this.appSchedulingInfo.isBlacklisted(resourceName);
}
+ public synchronized int addMissedNonPartitionedRequestSchedulingOpportunity(
+ Priority priority) {
+ missedNonPartitionedRequestSchedulingOpportunity.add(priority);
+ return missedNonPartitionedRequestSchedulingOpportunity.count(priority);
+ }
+
+ public synchronized void
+ resetMissedNonPartitionedRequestSchedulingOpportunity(Priority priority) {
+ missedNonPartitionedRequestSchedulingOpportunity.setCount(priority, 0);
+ }
+
+
public synchronized void addSchedulingOpportunity(Priority priority) {
schedulingOpportunities.setCount(priority,
schedulingOpportunities.count(priority) + 1);
@@ -518,6 +542,7 @@ public class SchedulerApplicationAttempt {
public synchronized void resetSchedulingOpportunities(Priority priority) {
resetSchedulingOpportunities(priority, System.currentTimeMillis());
}
+
// used for continuous scheduling
public synchronized void resetSchedulingOpportunities(Priority priority,
long currentTimeMs) {
@@ -669,4 +694,13 @@ public class SchedulerApplicationAttempt {
public Set<String> getBlacklistedNodes() {
return this.appSchedulingInfo.getBlackListCopy();
}
+
+ @Private
+ public boolean hasPendingResourceRequest(ResourceCalculator rc,
+ String nodePartition, Resource cluster,
+ SchedulingMode schedulingMode) {
+ return SchedulerUtils.hasPendingResourceRequest(rc,
+ this.attemptResourceUsage, nodePartition, cluster,
+ schedulingMode);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 248cc08..7a1a528 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -37,11 +37,10 @@ import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.collect.Sets;
-
/**
* Utilities shared by schedulers.
*/
@@ -235,9 +234,13 @@ public class SchedulerUtils {
if (labelExp == null && queueInfo != null
&& ResourceRequest.ANY.equals(resReq.getResourceName())) {
labelExp = queueInfo.getDefaultNodeLabelExpression();
- resReq.setNodeLabelExpression(labelExp);
}
+ // If labelExp still equals to null, set it to be NO_LABEL
+ resReq
+ .setNodeLabelExpression(labelExp == null ? RMNodeLabelsManager.NO_LABEL
+ : labelExp);
+
// we don't allow specify label expression other than resourceName=ANY now
if (!ResourceRequest.ANY.equals(resReq.getResourceName())
&& labelExp != null && !labelExp.trim().isEmpty()) {
@@ -273,25 +276,6 @@ public class SchedulerUtils {
}
}
- public static boolean checkQueueAccessToNode(Set<String> queueLabels,
- Set<String> nodeLabels) {
- // if queue's label is *, it can access any node
- if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
- return true;
- }
- // any queue can access to a node without label
- if (nodeLabels == null || nodeLabels.isEmpty()) {
- return true;
- }
- // a queue can access to a node only if it contains any label of the node
- if (queueLabels != null
- && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
- return true;
- }
- // sorry, you cannot access
- return false;
- }
-
public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
Set<String> labels) throws IOException {
if (mgr == null) {
@@ -311,26 +295,6 @@ public class SchedulerUtils {
}
}
}
-
- public static boolean checkNodeLabelExpression(Set<String> nodeLabels,
- String labelExpression) {
- // empty label expression can only allocate on node with empty labels
- if (labelExpression == null || labelExpression.trim().isEmpty()) {
- if (!nodeLabels.isEmpty()) {
- return false;
- }
- }
-
- if (labelExpression != null) {
- for (String str : labelExpression.split("&&")) {
- if (!str.trim().isEmpty()
- && (nodeLabels == null || !nodeLabels.contains(str.trim()))) {
- return false;
- }
- }
- }
- return true;
- }
public static boolean checkQueueLabelExpression(Set<String> queueLabels,
String labelExpression) {
@@ -360,4 +324,43 @@ public class SchedulerUtils {
}
return null;
}
+
+ public static boolean checkResourceRequestMatchingNodePartition(
+ ResourceRequest offswitchResourceRequest, String nodePartition,
+ SchedulingMode schedulingMode) {
+ // We will only look at node label = nodeLabelToLookAt according to
+ // schedulingMode and partition of node.
+ String nodePartitionToLookAt = null;
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+ nodePartitionToLookAt = nodePartition;
+ } else {
+ nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+ }
+
+ String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression();
+ if (null == askedNodePartition) {
+ askedNodePartition = RMNodeLabelsManager.NO_LABEL;
+ }
+ return askedNodePartition.equals(nodePartitionToLookAt);
+ }
+
+ private static boolean hasPendingResourceRequest(ResourceCalculator rc,
+ ResourceUsage usage, String partitionToLookAt, Resource cluster) {
+ if (Resources.greaterThan(rc, cluster,
+ usage.getPending(partitionToLookAt), Resources.none())) {
+ return true;
+ }
+ return false;
+ }
+
+ @Private
+ public static boolean hasPendingResourceRequest(ResourceCalculator rc,
+ ResourceUsage usage, String nodePartition, Resource cluster,
+ SchedulingMode schedulingMode) {
+ String partitionToLookAt = nodePartition;
+ if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ partitionToLookAt = RMNodeLabelsManager.NO_LABEL;
+ }
+ return hasPendingResourceRequest(rc, usage, partitionToLookAt, cluster);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 42ea089..d95c45c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -38,12 +37,12 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
@@ -56,6 +55,11 @@ import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
+ static final CSAssignment NULL_ASSIGNMENT =
+ new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+
+ static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+
CSQueue parent;
final String queueName;
volatile int numContainers;
@@ -343,16 +347,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
synchronized void allocateResource(Resource clusterResource,
- Resource resource, Set<String> nodeLabels) {
-
- // Update usedResources by labels
- if (nodeLabels == null || nodeLabels.isEmpty()) {
- queueUsage.incUsed(resource);
- } else {
- for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
- queueUsage.incUsed(label, resource);
- }
- }
+ Resource resource, String nodePartition) {
+ queueUsage.incUsed(nodePartition, resource);
++numContainers;
CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
@@ -360,15 +356,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
protected synchronized void releaseResource(Resource clusterResource,
- Resource resource, Set<String> nodeLabels) {
- // Update usedResources by labels
- if (null == nodeLabels || nodeLabels.isEmpty()) {
- queueUsage.decUsed(resource);
- } else {
- for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
- queueUsage.decUsed(label, resource);
- }
- }
+ Resource resource, String nodePartition) {
+ queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
clusterResource, minimumAllocation);
@@ -434,103 +423,108 @@ public abstract class AbstractCSQueue implements CSQueue {
parentQ.getPreemptionDisabled());
}
- private Resource getCurrentLimitResource(String nodeLabel,
- Resource clusterResource, ResourceLimits currentResourceLimits) {
- /*
- * Current limit resource: For labeled resource: limit = queue-max-resource
- * (TODO, this part need update when we support labeled-limit) For
- * non-labeled resource: limit = min(queue-max-resource,
- * limit-set-by-parent)
- */
- Resource queueMaxResource =
- Resources.multiplyAndNormalizeDown(resourceCalculator,
- labelManager.getResourceByLabel(nodeLabel, clusterResource),
- queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
- if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
- return Resources.min(resourceCalculator, clusterResource,
- queueMaxResource, currentResourceLimits.getLimit());
+ private Resource getCurrentLimitResource(String nodePartition,
+ Resource clusterResource, ResourceLimits currentResourceLimits,
+ SchedulingMode schedulingMode) {
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+ /*
+ * Current limit resource: For labeled resource: limit = queue-max-resource
+ * (TODO, this part need update when we support labeled-limit) For
+ * non-labeled resource: limit = min(queue-max-resource,
+ * limit-set-by-parent)
+ */
+ Resource queueMaxResource =
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
+ labelManager.getResourceByLabel(nodePartition, clusterResource),
+ queueCapacities.getAbsoluteMaximumCapacity(nodePartition), minimumAllocation);
+ if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ return Resources.min(resourceCalculator, clusterResource,
+ queueMaxResource, currentResourceLimits.getLimit());
+ }
+ return queueMaxResource;
+ } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ // When we doing non-exclusive resource allocation, maximum capacity of
+ // all queues on this label equals to total resource with the label.
+ return labelManager.getResourceByLabel(nodePartition, clusterResource);
}
- return queueMaxResource;
+
+ return Resources.none();
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
- Set<String> nodeLabels, ResourceLimits currentResourceLimits,
- Resource nowRequired, Resource resourceCouldBeUnreserved) {
- // Get label of this queue can access, it's (nodeLabel AND queueLabel)
- Set<String> labelCanAccess;
- if (null == nodeLabels || nodeLabels.isEmpty()) {
- labelCanAccess = new HashSet<String>();
- // Any queue can always access any node without label
- labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
- } else {
- labelCanAccess = new HashSet<String>(
- accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
- : Sets.intersection(accessibleLabels, nodeLabels));
- }
-
- for (String label : labelCanAccess) {
- // New total resource = used + required
- Resource newTotalResource =
- Resources.add(queueUsage.getUsed(label), nowRequired);
-
- Resource currentLimitResource =
- getCurrentLimitResource(label, clusterResource, currentResourceLimits);
-
- // if reservation continous looking enabled, check to see if could we
- // potentially use this node instead of a reserved node if the application
- // has reserved containers.
- // TODO, now only consider reservation cases when the node has no label
- if (this.reservationsContinueLooking
- && label.equals(RMNodeLabelsManager.NO_LABEL)
- && Resources.greaterThan(resourceCalculator, clusterResource,
- resourceCouldBeUnreserved, Resources.none())) {
- // resource-without-reserved = used - reserved
- Resource newTotalWithoutReservedResource =
- Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-
- // when total-used-without-reserved-resource < currentLimit, we still
- // have chance to allocate on this node by unreserving some containers
- if (Resources.lessThan(resourceCalculator, clusterResource,
- newTotalWithoutReservedResource, currentLimitResource)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to use reserved: " + getQueueName()
- + " usedResources: " + queueUsage.getUsed()
- + ", clusterResources: " + clusterResource
- + ", reservedResources: " + resourceCouldBeUnreserved
- + ", capacity-without-reserved: "
- + newTotalWithoutReservedResource + ", maxLimitCapacity: "
- + currentLimitResource);
- }
- return true;
+ String nodePartition, ResourceLimits currentResourceLimits,
+ Resource nowRequired, Resource resourceCouldBeUnreserved,
+ SchedulingMode schedulingMode) {
+ // New total resource = used + required
+ Resource newTotalResource =
+ Resources.add(queueUsage.getUsed(nodePartition), nowRequired);
+
+ // Get current limited resource:
+ // - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
+ // queues' max capacity.
+ // - When doing IGNORE_PARTITION_EXCLUSIVITY allocation, we will not respect
+ // queue's max capacity, queue's max capacity on the partition will be
+ // considered to be 100%. Which is a queue can use all resource in the
+ // partition.
+ // Doing this because: for non-exclusive allocation, we make sure there's
+ // idle resource on the partition, to avoid wastage, such resource will be
+ // leveraged as much as we can, and preemption policy will reclaim it back
+ // when partitoned-resource-request comes back.
+ Resource currentLimitResource =
+ getCurrentLimitResource(nodePartition, clusterResource,
+ currentResourceLimits, schedulingMode);
+
+ // if reservation continous looking enabled, check to see if could we
+ // potentially use this node instead of a reserved node if the application
+ // has reserved containers.
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking
+ && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+ && Resources.greaterThan(resourceCalculator, clusterResource,
+ resourceCouldBeUnreserved, Resources.none())) {
+ // resource-without-reserved = used - reserved
+ Resource newTotalWithoutReservedResource =
+ Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+ // when total-used-without-reserved-resource < currentLimit, we still
+ // have chance to allocate on this node by unreserving some containers
+ if (Resources.lessThan(resourceCalculator, clusterResource,
+ newTotalWithoutReservedResource, currentLimitResource)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to use reserved: " + getQueueName()
+ + " usedResources: " + queueUsage.getUsed()
+ + ", clusterResources: " + clusterResource
+ + ", reservedResources: " + resourceCouldBeUnreserved
+ + ", capacity-without-reserved: "
+ + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ + currentLimitResource);
}
+ return true;
}
-
- // Otherwise, if any of the label of this node beyond queue limit, we
- // cannot allocate on this node. Consider a small epsilon here.
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- newTotalResource, currentLimitResource)) {
- return false;
- }
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName()
- + "Check assign to queue, label=" + label
- + " usedResources: " + queueUsage.getUsed(label)
- + " clusterResources: " + clusterResource
- + " currentUsedCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(label),
- labelManager.getResourceByLabel(label, clusterResource))
- + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity(label)
- + ")");
- }
- return true;
+ // Check if we over current-resource-limit computed.
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ newTotalResource, currentLimitResource)) {
+ return false;
}
-
- // Actually, this will not happen, since labelCanAccess will be always
- // non-empty
- return false;
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName()
+ + "Check assign to queue, nodePartition="
+ + nodePartition
+ + " usedResources: "
+ + queueUsage.getUsed(nodePartition)
+ + " clusterResources: "
+ + clusterResource
+ + " currentUsedCapacity "
+ + Resources.divide(resourceCalculator, clusterResource,
+ queueUsage.getUsed(nodePartition),
+ labelManager.getResourceByLabel(nodePartition, clusterResource))
+ + " max-capacity: "
+ + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+ }
+ return true;
}
@Override
@@ -556,4 +550,33 @@ public abstract class AbstractCSQueue implements CSQueue {
parent.decPendingResource(nodeLabel, resourceToDec);
}
}
+
+ /**
+ * Return if the queue has pending resource on given nodePartition and
+ * schedulingMode.
+ */
+ boolean hasPendingResourceRequest(String nodePartition,
+ Resource cluster, SchedulingMode schedulingMode) {
+ return SchedulerUtils.hasPendingResourceRequest(resourceCalculator,
+ queueUsage, nodePartition, cluster, schedulingMode);
+ }
+
+ boolean accessibleToPartition(String nodePartition) {
+ // if queue's label is *, it can access any node
+ if (accessibleLabels != null
+ && accessibleLabels.contains(RMNodeLabelsManager.ANY)) {
+ return true;
+ }
+ // any queue can access to a node without label
+ if (nodePartition == null
+ || nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ return true;
+ }
+ // a queue can access to a node only if it contains any label of the node
+ if (accessibleLabels != null && accessibleLabels.contains(nodePartition)) {
+ return true;
+ }
+ // sorry, you cannot access
+ return false;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 1a9448a..b06a646 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -190,10 +190,13 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
* @param resourceLimits how much overall resource of this queue can use.
+ * @param schedulingMode Type of exclusive check when assign container on a
+ * NodeManager, see {@link SchedulingMode}.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, ResourceLimits resourceLimits);
+ FiCaSchedulerNode node, ResourceLimits resourceLimits,
+ SchedulingMode schedulingMode);
/**
* A container assigned to the queue has completed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index e93c529..cfeee37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,6 +35,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
@@ -1114,28 +1115,30 @@ public class CapacityScheduler extends
if (reservedContainer != null) {
FiCaSchedulerApp reservedApplication =
getCurrentAttemptForContainer(reservedContainer.getContainerId());
-
+
// Try to fulfill the reservation
- LOG.info("Trying to fulfill reservation for application " +
- reservedApplication.getApplicationId() + " on node: " +
- node.getNodeID());
-
- LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- assignment = queue.assignContainers(
+ LOG.info("Trying to fulfill reservation for application "
+ + reservedApplication.getApplicationId() + " on node: "
+ + node.getNodeID());
+
+ LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+ assignment =
+ queue.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, clusterResource)));
+ RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
if (assignment.isFulfilledReservation()) {
CSAssignment tmp =
new CSAssignment(reservedContainer.getReservedResource(),
- assignment.getType());
+ assignment.getType());
Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- reservedContainer.getReservedResource());
+ reservedContainer.getReservedResource());
tmp.getAssignmentInformation().addAllocationDetails(
- reservedContainer.getContainerId(), queue.getQueuePath());
+ reservedContainer.getContainerId(), queue.getQueuePath());
tmp.getAssignmentInformation().incrAllocations();
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
@@ -1143,16 +1146,13 @@ public class CapacityScheduler extends
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
- Container container = excessReservation.getContainer();
- queue.completedContainer(
- clusterResource, assignment.getApplication(), node,
- excessReservation,
- SchedulerUtils.createAbnormalContainerStatus(
- container.getId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, true);
+ Container container = excessReservation.getContainer();
+ queue.completedContainer(clusterResource, assignment.getApplication(),
+ node, excessReservation, SchedulerUtils
+ .createAbnormalContainerStatus(container.getId(),
+ SchedulerUtils.UNRESERVED_CONTAINER),
+ RMContainerEventType.RELEASED, null, true);
}
-
}
// Try to schedule more if there are no reservations to fulfill
@@ -1163,22 +1163,61 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
+
assignment = root.assignContainers(
clusterResource,
node,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager.getResourceByLabel(
- RMNodeLabelsManager.NO_LABEL, clusterResource)));
+ RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ if (Resources.greaterThan(calculator, clusterResource,
+ assignment.getResource(), Resources.none())) {
+ updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+ return;
+ }
+
+ // Only do non-exclusive allocation when node has node-labels.
+ if (StringUtils.equals(node.getPartition(),
+ RMNodeLabelsManager.NO_LABEL)) {
+ return;
+ }
+
+ // Only do non-exclusive allocation when the node-label supports that
+ try {
+ if (rmContext.getNodeLabelManager().isExclusiveNodeLabel(
+ node.getPartition())) {
+ return;
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception when trying to get exclusivity of node label="
+ + node.getPartition(), e);
+ return;
+ }
+
+ // Try to use NON_EXCLUSIVE
+ assignment = root.assignContainers(
+ clusterResource,
+ node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager.getResourceByLabel(
+ RMNodeLabelsManager.NO_LABEL, clusterResource)),
+ SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
+ if (Resources.greaterThan(calculator, clusterResource,
+ assignment.getResource(), Resources.none())) {
+ return;
+ }
}
} else {
- LOG.info("Skipping scheduling since node " + node.getNodeID() +
- " is reserved by application " +
- node.getReservedContainer().getContainerId().getApplicationAttemptId()
- );
+ LOG.info("Skipping scheduling since node "
+ + node.getNodeID()
+ + " is reserved by application "
+ + node.getReservedContainer().getContainerId()
+ .getApplicationAttemptId());
}
-
}
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 102e553..4e8d617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -319,6 +319,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
getMaximumApplicationMasterResourcePercent());
}
+ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
+ float percent) {
+ setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
+ }
+
public float getNonLabeledQueueCapacity(String queue) {
float capacity = queue.equals("root") ? 100.0f : getFloat(
getQueuePrefix(queue) + CAPACITY, UNDEFINED);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 59a016f..8a6a601 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,7 +24,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -58,6 +57,7 @@ import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
@@ -718,39 +718,11 @@ public class LeafQueue extends AbstractCSQueue {
ApplicationAttemptId applicationAttemptId) {
return applicationAttemptMap.get(applicationAttemptId);
}
-
- private static final CSAssignment NULL_ASSIGNMENT =
- new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
-
- private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
-
- private static Set<String> getRequestLabelSetByExpression(
- String labelExpression) {
- Set<String> labels = new HashSet<String>();
- if (null == labelExpression) {
- return labels;
- }
- for (String l : labelExpression.split("&&")) {
- if (l.trim().isEmpty()) {
- continue;
- }
- labels.add(l.trim());
- }
- return labels;
- }
-
- private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
- FiCaSchedulerNode node) {
- String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
- if (null == askedNodeLabel) {
- askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
- }
- return askedNodeLabel.equals(node.getPartition());
- }
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+ FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
+ SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if(LOG.isDebugEnabled()) {
@@ -758,12 +730,6 @@ public class LeafQueue extends AbstractCSQueue {
+ " #applications=" + activeApplications.size());
}
- // if our queue cannot access this node, just return
- if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
- node.getLabels())) {
- return NULL_ASSIGNMENT;
- }
-
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
@@ -771,8 +737,26 @@ public class LeafQueue extends AbstractCSQueue {
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
return assignReservedContainer(application, node, reservedContainer,
- clusterResource);
+ clusterResource, schedulingMode);
+ }
+ }
+
+ // if our queue cannot access this node, just return
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+ && !accessibleToPartition(node.getPartition())) {
+ return NULL_ASSIGNMENT;
+ }
+
+ // Check if this queue need more resource, simply skip allocation if this
+ // queue doesn't need more resources.
+ if (!hasPendingResourceRequest(node.getPartition(),
+ clusterResource, schedulingMode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip this queue=" + getQueuePath()
+ + ", because it doesn't need more resource, schedulingMode="
+ + schedulingMode.name() + " node-partition=" + node.getPartition());
}
+ return NULL_ASSIGNMENT;
}
// Try to assign containers to applications in order
@@ -783,6 +767,17 @@ public class LeafQueue extends AbstractCSQueue {
+ application.getApplicationId());
application.showRequests();
}
+
+ // Check if application needs more resource, skip if it doesn't need more.
+ if (!application.hasPendingResourceRequest(resourceCalculator,
+ node.getPartition(), clusterResource, schedulingMode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
+ + ", because it doesn't need more resource, schedulingMode="
+ + schedulingMode.name() + " node-label=" + node.getPartition());
+ }
+ continue;
+ }
synchronized (application) {
// Check if this resource is on the blacklist
@@ -806,10 +801,27 @@ public class LeafQueue extends AbstractCSQueue {
continue;
}
+ // AM container allocation doesn't support non-exclusive allocation to
+ // avoid painful of preempt an AM container
+ if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ RMAppAttempt rmAppAttempt =
+ csContext.getRMContext().getRMApps()
+ .get(application.getApplicationId()).getCurrentAppAttempt();
+ if (null == rmAppAttempt.getMasterContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip allocating AM container to app_attempt="
+ + application.getApplicationAttemptId()
+ + ", don't allow to allocate AM container in non-exclusive mode");
+ }
+ break;
+ }
+ }
+
// Is the node-label-expression of this offswitch resource request
// matches the node's label?
// If not match, jump to next priority.
- if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
+ anyRequest, node.getPartition(), schedulingMode)) {
continue;
}
@@ -822,10 +834,6 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- Set<String> requestedNodeLabels =
- getRequestLabelSetByExpression(anyRequest
- .getNodeLabelExpression());
-
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
// priority request as the target.
@@ -833,27 +841,61 @@ public class LeafQueue extends AbstractCSQueue {
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
- required, requestedNodeLabels);
+ required, node.getPartition(), schedulingMode);
// Check queue max-capacity limit
- if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
- this.currentResourceLimits, required, application.getCurrentReservation())) {
+ if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+ this.currentResourceLimits, required,
+ application.getCurrentReservation(), schedulingMode)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
- application, true, requestedNodeLabels)) {
+ application, true, node.getPartition())) {
break;
}
// Inform the application it is about to get a scheduling opportunity
application.addSchedulingOpportunity(priority);
+ // Increase missed-non-partitioned-resource-request-opportunity.
+ // This is to make sure non-partitioned-resource-request will prefer
+ // to be allocated to non-partitioned nodes
+ int missedNonPartitionedRequestSchedulingOpportunity = 0;
+ if (anyRequest.getNodeLabelExpression().equals(
+ RMNodeLabelsManager.NO_LABEL)) {
+ missedNonPartitionedRequestSchedulingOpportunity =
+ application
+ .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
+ }
+
+ if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ // Before doing allocation, we need to check scheduling opportunity to
+ // make sure : non-partitioned resource request should be scheduled to
+ // non-partitioned partition first.
+ if (missedNonPartitionedRequestSchedulingOpportunity < scheduler
+ .getNumClusterNodes()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip app_attempt="
+ + application.getApplicationAttemptId()
+ + " priority="
+ + priority
+ + " because missed-non-partitioned-resource-request"
+ + " opportunity under requred:"
+ + " Now=" + missedNonPartitionedRequestSchedulingOpportunity
+ + " required="
+ + scheduler.getNumClusterNodes());
+ }
+
+ break;
+ }
+ }
+
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
- null);
+ null, schedulingMode);
// Did the application skip this node?
if (assignment.getSkipped()) {
@@ -870,9 +912,9 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
- node.getLabels());
+ node.getPartition());
- // Don't reset scheduling opportunities for non-local assignments
+ // Don't reset scheduling opportunities for offswitch assignments
// otherwise the app will be delayed for each non-local assignment.
// This helps apps with many off-cluster requests schedule faster.
if (assignment.getType() != NodeType.OFF_SWITCH) {
@@ -881,6 +923,10 @@ public class LeafQueue extends AbstractCSQueue {
}
application.resetSchedulingOpportunities(priority);
}
+ // Non-exclusive scheduling opportunity is different: we need reset
+ // it every time to make sure non-labeled resource request will be
+ // most likely allocated on non-labeled nodes first.
+ application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
// Done
return assignment;
@@ -904,7 +950,8 @@ public class LeafQueue extends AbstractCSQueue {
private synchronized CSAssignment assignReservedContainer(
FiCaSchedulerApp application, FiCaSchedulerNode node,
- RMContainer rmContainer, Resource clusterResource) {
+ RMContainer rmContainer, Resource clusterResource,
+ SchedulingMode schedulingMode) {
// Do we still need this reservation?
Priority priority = rmContainer.getReservedPriority();
if (application.getTotalRequiredResources(priority) == 0) {
@@ -915,7 +962,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources
CSAssignment tmp =
assignContainersOnNode(clusterResource, node, application, priority,
- rmContainer);
+ rmContainer, schedulingMode);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@@ -929,7 +976,8 @@ public class LeafQueue extends AbstractCSQueue {
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application, Resource required) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
- computeUserLimit(application, clusterResource, required, user, null));
+ computeUserLimit(application, clusterResource, required, user,
+ RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
}
private Resource getHeadroom(User user, Resource currentResourceLimit,
@@ -973,7 +1021,8 @@ public class LeafQueue extends AbstractCSQueue {
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
- Resource clusterResource, Resource required, Set<String> requestedLabels) {
+ Resource clusterResource, Resource required, String nodePartition,
+ SchedulingMode schedulingMode) {
String user = application.getUser();
User queueUser = getUser(user);
@@ -981,7 +1030,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, need consider headroom respect labels also
Resource userLimit =
computeUserLimit(application, clusterResource, required,
- queueUser, requestedLabels);
+ queueUser, nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@@ -1010,34 +1059,18 @@ public class LeafQueue extends AbstractCSQueue {
@Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application,
Resource clusterResource, Resource required, User user,
- Set<String> requestedLabels) {
+ String nodePartition, SchedulingMode schedulingMode) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
// with miniscule capacity (< 1 slot) make progress
// * If we're running over capacity, then its
// (usedResources + required) (which extra resources we are allocating)
- Resource queueCapacity = Resource.newInstance(0, 0);
- if (requestedLabels != null && !requestedLabels.isEmpty()) {
- // if we have multiple labels to request, we will choose to use the first
- // label
- String firstLabel = requestedLabels.iterator().next();
- queueCapacity =
- Resources
- .max(resourceCalculator, clusterResource, queueCapacity,
- Resources.multiplyAndNormalizeUp(resourceCalculator,
- labelManager.getResourceByLabel(firstLabel,
- clusterResource),
- queueCapacities.getAbsoluteCapacity(firstLabel),
- minimumAllocation));
- } else {
- // else there's no label on request, just to use absolute capacity as
- // capacity for nodes without label
- queueCapacity =
- Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
- .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
- queueCapacities.getAbsoluteCapacity(), minimumAllocation);
- }
+ Resource queueCapacity =
+ Resources.multiplyAndNormalizeUp(resourceCalculator,
+ labelManager.getResourceByLabel(nodePartition, clusterResource),
+ queueCapacities.getAbsoluteCapacity(nodePartition),
+ minimumAllocation);
// Allow progress for queues with miniscule capacity
queueCapacity =
@@ -1047,33 +1080,56 @@ public class LeafQueue extends AbstractCSQueue {
required);
Resource currentCapacity =
- Resources.lessThan(resourceCalculator, clusterResource,
- queueUsage.getUsed(), queueCapacity) ?
- queueCapacity : Resources.add(queueUsage.getUsed(), required);
+ Resources.lessThan(resourceCalculator, clusterResource,
+ queueUsage.getUsed(nodePartition), queueCapacity) ? queueCapacity
+ : Resources.add(queueUsage.getUsed(nodePartition), required);
// Never allow a single user to take more than the
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
- final int activeUsers = activeUsersManager.getNumActiveUsers();
-
- Resource limit =
+ final int activeUsers = activeUsersManager.getNumActiveUsers();
+
+ // User limit resource is determined by:
+ // max{currentCapacity / #activeUsers, currentCapacity * user-limit-percentage%)
+ Resource userLimitResource = Resources.max(
+ resourceCalculator, clusterResource,
+ Resources.divideAndCeil(
+ resourceCalculator, currentCapacity, activeUsers),
+ Resources.divideAndCeil(
+ resourceCalculator,
+ Resources.multiplyAndRoundDown(
+ currentCapacity, userLimit),
+ 100)
+ );
+
+ // User limit is capped by maxUserLimit
+ // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
+ // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
+ //
+ // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
+ // partition, its guaranteed resource on that partition is 0. And
+ // user-limit-factor computation is based on queue's guaranteed capacity. So
+ // we will not cap user-limit as well as used resource when doing
+ // IGNORE_PARTITION_EXCLUSIVITY allocation.
+ Resource maxUserLimit = Resources.none();
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+ maxUserLimit =
+ Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
+ } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ maxUserLimit =
+ labelManager.getResourceByLabel(nodePartition, clusterResource);
+ }
+
+ // Cap final user limit with maxUserLimit
+ userLimitResource =
Resources.roundUp(
resourceCalculator,
Resources.min(
resourceCalculator, clusterResource,
- Resources.max(
- resourceCalculator, clusterResource,
- Resources.divideAndCeil(
- resourceCalculator, currentCapacity, activeUsers),
- Resources.divideAndCeil(
- resourceCalculator,
- Resources.multiplyAndRoundDown(
- currentCapacity, userLimit),
- 100)
- ),
- Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor)
+ userLimitResource,
+ maxUserLimit
),
minimumAllocation);
@@ -1081,11 +1137,11 @@ public class LeafQueue extends AbstractCSQueue {
String userName = application.getUser();
LOG.debug("User limit computation for " + userName +
" in queue " + getQueueName() +
- " userLimit=" + userLimit +
+ " userLimitPercent=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
" consumed: " + user.getUsed() +
- " limit: " + limit +
+ " user-limit-resource: " + userLimitResource +
" queueCapacity: " + queueCapacity +
" qconsumed: " + queueUsage.getUsed() +
" currentCapacity: " + currentCapacity +
@@ -1093,31 +1149,26 @@ public class LeafQueue extends AbstractCSQueue {
" clusterCapacity: " + clusterResource
);
}
- user.setUserResourceLimit(limit);
- return limit;
+ user.setUserResourceLimit(userLimitResource);
+ return userLimitResource;
}
@Private
protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
- boolean checkReservations, Set<String> requestLabels) {
+ boolean checkReservations, String nodePartition) {
User user = getUser(userName);
-
- String label = CommonNodeLabelsManager.NO_LABEL;
- if (requestLabels != null && !requestLabels.isEmpty()) {
- label = requestLabels.iterator().next();
- }
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
if (Resources
.greaterThan(resourceCalculator, clusterResource,
- user.getUsed(label),
+ user.getUsed(nodePartition),
limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations
- && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
+ && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
@@ -1136,7 +1187,7 @@ public class LeafQueue extends AbstractCSQueue {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: "
- + user.getUsed() + " limit: " + limit);
+ + user.getUsed(nodePartition) + " limit: " + limit);
}
return false;
}
@@ -1176,7 +1227,7 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer) {
+ RMContainer reservedContainer, SchedulingMode schedulingMode) {
CSAssignment assigned;
@@ -1190,7 +1241,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) {
@@ -1219,7 +1270,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) {
@@ -1248,7 +1299,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
// update locality statistics
if (allocatedContainer.getValue() != null) {
@@ -1314,16 +1365,17 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
- FiCaSchedulerApp application, Resource capability) {
+ FiCaSchedulerApp application, Resource capability, String nodePartition,
+ SchedulingMode schedulingMode) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
- clusterResource, capability, null);
+ clusterResource, capability, nodePartition, schedulingMode);
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
- if (!canAssignToThisQueue(clusterResource, null,
- this.currentResourceLimits, capability, Resources.none())) {
+ if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
+ this.currentResourceLimits, capability, Resources.none(), schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@@ -1332,7 +1384,7 @@ public class LeafQueue extends AbstractCSQueue {
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
- application, false, null)) {
+ application, false, nodePartition)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
}
@@ -1345,12 +1397,13 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer) {
+ RMContainer reservedContainer, MutableObject allocatedContainer,
+ SchedulingMode schedulingMode) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
}
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@@ -1359,12 +1412,13 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignRackLocalContainers(Resource clusterResource,
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer) {
+ RMContainer reservedContainer, MutableObject allocatedContainer,
+ SchedulingMode schedulingMode) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
}
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@@ -1373,16 +1427,21 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignOffSwitchContainers(Resource clusterResource,
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer) {
+ RMContainer reservedContainer, MutableObject allocatedContainer,
+ SchedulingMode schedulingMode) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- allocatedContainer);
+ allocatedContainer, schedulingMode);
}
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
}
+
+ private int getActualNodeLocalityDelay() {
+ return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay());
+ }
boolean canAssign(FiCaSchedulerApp application, Priority priority,
FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
@@ -1417,10 +1476,7 @@ public class LeafQueue extends AbstractCSQueue {
if (type == NodeType.RACK_LOCAL) {
// 'Delay' rack-local just a little bit...
long missedOpportunities = application.getSchedulingOpportunities(priority);
- return (
- Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay()) <
- missedOpportunities
- );
+ return getActualNodeLocalityDelay() < missedOpportunities;
}
// Check if we need containers on this host
@@ -1460,7 +1516,7 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
- MutableObject createdContainer) {
+ MutableObject createdContainer, SchedulingMode schedulingMode) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
@@ -1469,9 +1525,8 @@ public class LeafQueue extends AbstractCSQueue {
}
// check if the resource request can access the label
- if (!SchedulerUtils.checkNodeLabelExpression(
- node.getLabels(),
- request.getNodeLabelExpression())) {
+ if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
+ node.getPartition(), schedulingMode)) {
// this is a reserved container, but we cannot allocate it now according
// to label not match. This can be caused by node label changed
// We should un-reserve this container.
@@ -1576,8 +1631,8 @@ public class LeafQueue extends AbstractCSQueue {
// If we're trying to reserve a container here, not container will be
// unreserved for reserving the new one. Check limits again before
// reserve the new container
- if (!checkLimitsToReserve(clusterResource,
- application, capability)) {
+ if (!checkLimitsToReserve(clusterResource,
+ application, capability, node.getPartition(), schedulingMode)) {
return new CSAssignment(Resources.none(), type);
}
}
@@ -1666,7 +1721,7 @@ public class LeafQueue extends AbstractCSQueue {
// Book-keeping
if (removed) {
releaseResource(clusterResource, application,
- container.getResource(), node.getLabels());
+ container.getResource(), node.getPartition());
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@@ -1684,13 +1739,13 @@ public class LeafQueue extends AbstractCSQueue {
synchronized void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
- Set<String> nodeLabels) {
- super.allocateResource(clusterResource, resource, nodeLabels);
+ String nodePartition) {
+ super.allocateResource(clusterResource, resource, nodePartition);
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
- user.assignContainer(resource, nodeLabels);
+ user.assignContainer(resource, nodePartition);
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
Resources.subtractFrom(application.getHeadroom(), resource); // headroom
@@ -1707,13 +1762,13 @@ public class LeafQueue extends AbstractCSQueue {
}
synchronized void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) {
- super.releaseResource(clusterResource, resource, nodeLabels);
+ FiCaSchedulerApp application, Resource resource, String nodePartition) {
+ super.releaseResource(clusterResource, resource, nodePartition);
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
- user.releaseContainer(resource, nodeLabels);
+ user.releaseContainer(resource, nodePartition);
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() +
@@ -1723,7 +1778,8 @@ public class LeafQueue extends AbstractCSQueue {
private void updateAbsoluteCapacityResource(Resource clusterResource) {
absoluteCapacityResource =
- Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
+ Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
@@ -1769,8 +1825,9 @@ public class LeafQueue extends AbstractCSQueue {
// Update application properties
for (FiCaSchedulerApp application : activeApplications) {
synchronized (application) {
- computeUserLimitAndSetHeadroom(application, clusterResource,
- Resources.none(), null);
+ computeUserLimitAndSetHeadroom(application, clusterResource,
+ Resources.none(), RMNodeLabelsManager.NO_LABEL,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
}
}
@@ -1828,25 +1885,12 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- public void assignContainer(Resource resource,
- Set<String> nodeLabels) {
- if (nodeLabels == null || nodeLabels.isEmpty()) {
- userResourceUsage.incUsed(resource);
- } else {
- for (String label : nodeLabels) {
- userResourceUsage.incUsed(label, resource);
- }
- }
+ public void assignContainer(Resource resource, String nodePartition) {
+ userResourceUsage.incUsed(nodePartition, resource);
}
- public void releaseContainer(Resource resource, Set<String> nodeLabels) {
- if (nodeLabels == null || nodeLabels.isEmpty()) {
- userResourceUsage.decUsed(resource);
- } else {
- for (String label : nodeLabels) {
- userResourceUsage.decUsed(label, resource);
- }
- }
+ public void releaseContainer(Resource resource, String nodePartition) {
+ userResourceUsage.decUsed(nodePartition, resource);
}
public Resource getUserResourceLimit() {
@@ -1869,7 +1913,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1909,7 +1953,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1927,7 +1971,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
[2/3] hadoop git commit: YARN-3361. CapacityScheduler side changes to
support non-exclusive node labels. Contributed by Wangda Tan
Posted by ji...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 882498a..eb64d43 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -56,8 +56,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.AssignmentInformation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -377,16 +375,29 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, ResourceLimits resourceLimits) {
- CSAssignment assignment =
- new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
- Set<String> nodeLabels = node.getLabels();
-
+ FiCaSchedulerNode node, ResourceLimits resourceLimits,
+ SchedulingMode schedulingMode) {
// if our queue cannot access this node, just return
- if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels, nodeLabels)) {
- return assignment;
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
+ && !accessibleToPartition(node.getPartition())) {
+ return NULL_ASSIGNMENT;
+ }
+
+ // Check if this queue need more resource, simply skip allocation if this
+ // queue doesn't need more resources.
+ if (!super.hasPendingResourceRequest(node.getPartition(),
+ clusterResource, schedulingMode)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip this queue=" + getQueuePath()
+ + ", because it doesn't need more resource, schedulingMode="
+ + schedulingMode.name() + " node-partition=" + node.getPartition());
+ }
+ return NULL_ASSIGNMENT;
}
+ CSAssignment assignment =
+ new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign containers to child-queue of "
@@ -396,15 +407,17 @@ public class ParentQueue extends AbstractCSQueue {
// Are we over maximum-capacity for this queue?
// This will also consider parent's limits and also continuous reservation
// looking
- if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
- minimumAllocation, Resources.createResource(getMetrics()
- .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+ if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+ resourceLimits, minimumAllocation, Resources.createResource(
+ getMetrics().getReservedMB(), getMetrics()
+ .getReservedVirtualCores()), schedulingMode)) {
break;
}
// Schedule
- CSAssignment assignedToChild =
- assignContainersToChildQueues(clusterResource, node, resourceLimits);
+ CSAssignment assignedToChild =
+ assignContainersToChildQueues(clusterResource, node, resourceLimits,
+ schedulingMode);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@@ -413,7 +426,7 @@ public class ParentQueue extends AbstractCSQueue {
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
super.allocateResource(clusterResource, assignedToChild.getResource(),
- nodeLabels);
+ node.getPartition());
// Track resource utilization in this pass of the scheduler
Resources
@@ -510,7 +523,8 @@ public class ParentQueue extends AbstractCSQueue {
}
private synchronized CSAssignment assignContainersToChildQueues(
- Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
+ Resource cluster, FiCaSchedulerNode node, ResourceLimits limits,
+ SchedulingMode schedulingMode) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -523,12 +537,13 @@ public class ParentQueue extends AbstractCSQueue {
LOG.debug("Trying to assign to queue: " + childQueue.getQueuePath()
+ " stats: " + childQueue);
}
-
+
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
- assignment = childQueue.assignContainers(cluster, node, childLimits);
+ assignment = childQueue.assignContainers(cluster, node,
+ childLimits, schedulingMode);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
@@ -584,7 +599,7 @@ public class ParentQueue extends AbstractCSQueue {
// Book keeping
synchronized (this) {
super.releaseResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -653,7 +668,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
@@ -681,7 +696,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.allocateResource(clusterResource, rmContainer.getContainer()
- .getResource(), node.getLabels());
+ .getResource(), node.getPartition());
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
@@ -701,7 +716,7 @@ public class ParentQueue extends AbstractCSQueue {
scheduler.getNode(rmContainer.getContainer().getNodeId());
super.releaseResource(clusterResource,
rmContainer.getContainer().getResource(),
- node.getLabels());
+ node.getPartition());
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster="
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
new file mode 100644
index 0000000..7e7dc37
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/SchedulingMode.java
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+/**
+ * Scheduling modes, see below for detailed explanations
+ */
+public enum SchedulingMode {
+ /**
+ * <p>
+ * When a node has partition (say partition=x), only application in the queue
+ * can access to partition=x AND requires for partition=x resource can get
+ * chance to allocate on the node.
+ * </p>
+ *
+ * <p>
+ * When a node has no partition, only application requires non-partitioned
+ * resource can get chance to allocate on the node.
+ * </p>
+ */
+ RESPECT_PARTITION_EXCLUSIVITY,
+
+ /**
+ * Only used when a node has partition AND the partition isn't an exclusive
+ * partition AND application requires non-partitioned resource.
+ */
+ IGNORE_PARTITION_EXCLUSIVITY
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index 76ede39..9b7eb84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.Task.State;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
@@ -277,6 +278,9 @@ public class Application {
} else {
request.setNumContainers(request.getNumContainers() + 1);
}
+ if (request.getNodeLabelExpression() == null) {
+ request.setNodeLabelExpression(RMNodeLabelsManager.NO_LABEL);
+ }
// Note this down for next interaction with ResourceManager
ask.remove(request);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index f62fdb3..5c107aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -150,8 +150,14 @@ public class MockAM {
public AllocateResponse allocate(
String host, int memory, int numContainers,
List<ContainerId> releases, String labelExpression) throws Exception {
+ return allocate(host, memory, numContainers, 1, releases, labelExpression);
+ }
+
+ public AllocateResponse allocate(
+ String host, int memory, int numContainers, int priority,
+ List<ContainerId> releases, String labelExpression) throws Exception {
List<ResourceRequest> reqs =
- createReq(new String[] { host }, memory, 1, numContainers,
+ createReq(new String[] { host }, memory, priority, numContainers,
labelExpression);
return allocate(reqs, releases);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
index 06c6b32..f2b1d86 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java
@@ -21,6 +21,8 @@ package org.apache.hadoop.yarn.server.resourcemanager;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collection;
import java.util.List;
import java.util.Map;
@@ -200,10 +202,18 @@ public class MockRM extends ResourceManager {
public boolean waitForState(MockNM nm, ContainerId containerId,
RMContainerState containerState, int timeoutMillisecs) throws Exception {
+ return waitForState(Arrays.asList(nm), containerId, containerState,
+ timeoutMillisecs);
+ }
+
+ public boolean waitForState(Collection<MockNM> nms, ContainerId containerId,
+ RMContainerState containerState, int timeoutMillisecs) throws Exception {
RMContainer container = getResourceScheduler().getRMContainer(containerId);
int timeoutSecs = 0;
while(container == null && timeoutSecs++ < timeoutMillisecs / 100) {
- nm.nodeHeartbeat(true);
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
container = getResourceScheduler().getRMContainer(containerId);
System.out.println("Waiting for container " + containerId + " to be allocated.");
Thread.sleep(100);
@@ -217,9 +227,11 @@ public class MockRM extends ResourceManager {
&& timeoutSecs++ < timeoutMillisecs / 100) {
System.out.println("Container : " + containerId + " State is : "
+ container.getState() + " Waiting for state : " + containerState);
- nm.nodeHeartbeat(true);
+ for (MockNM nm : nms) {
+ nm.nodeHeartbeat(true);
+ }
Thread.sleep(100);
-
+
if (timeoutMillisecs <= timeoutSecs * 100) {
return false;
}
@@ -650,11 +662,28 @@ public class MockRM extends ResourceManager {
am.waitForState(RMAppAttemptState.FINISHED);
rm.waitForState(rmApp.getApplicationId(), RMAppState.FINISHED);
}
+
+ @SuppressWarnings("rawtypes")
+ private static void waitForSchedulerAppAttemptAdded(
+ ApplicationAttemptId attemptId, MockRM rm) throws InterruptedException {
+ int tick = 0;
+ // Wait for at most 5 sec
+ while (null == ((AbstractYarnScheduler) rm.getResourceScheduler())
+ .getApplicationAttempt(attemptId) && tick < 50) {
+ Thread.sleep(100);
+ if (tick % 10 == 0) {
+ System.out.println("waiting for SchedulerApplicationAttempt="
+ + attemptId + " added.");
+ }
+ tick++;
+ }
+ }
public static MockAM launchAM(RMApp app, MockRM rm, MockNM nm)
throws Exception {
rm.waitForState(app.getApplicationId(), RMAppState.ACCEPTED);
RMAppAttempt attempt = app.getCurrentAppAttempt();
+ waitForSchedulerAppAttemptAdded(attempt.getAppAttemptId(), rm);
System.out.println("Launch AM " + attempt.getAppAttemptId());
nm.nodeHeartbeat(true);
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 1ca5c97..46167ca 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -612,7 +612,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -632,7 +632,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -652,7 +652,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
@@ -661,7 +661,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource)); // Schedule to compute
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 23b31fa..970a98a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.event.DrainDispatcher;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
@@ -133,7 +134,7 @@ public class TestChildQueueOrder {
final Resource allocatedResource = Resources.createResource(allocation);
if (queue instanceof ParentQueue) {
((ParentQueue)queue).allocateResource(clusterResource,
- allocatedResource, null);
+ allocatedResource, RMNodeLabelsManager.NO_LABEL);
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
@@ -145,7 +146,7 @@ public class TestChildQueueOrder {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
.assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
// Mock the node's resource availability
Resource available = node.getAvailableResource();
@@ -157,7 +158,7 @@ public class TestChildQueueOrder {
}
}).
when(queue).assignContainers(eq(clusterResource), eq(node),
- any(ResourceLimits.class));
+ any(ResourceLimits.class), any(SchedulingMode.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@@ -241,6 +242,14 @@ public class TestChildQueueOrder {
CSQueue b = queues.get(B);
CSQueue c = queues.get(C);
CSQueue d = queues.get(D);
+
+ // Make a/b/c/d has >0 pending resource, so that allocation will continue.
+ queues.get(CapacitySchedulerConfiguration.ROOT).getQueueResourceUsage()
+ .incPending(Resources.createResource(1 * GB));
+ a.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ b.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ c.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
+ d.getQueueResourceUsage().incPending(Resources.createResource(1 * GB));
final String user_0 = "user_0";
@@ -275,7 +284,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
for(int i=0; i < 2; i++)
{
stubQueueAllocation(a, clusterResource, node_0, 0*GB);
@@ -283,7 +292,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
for(int i=0; i < 3; i++)
{
@@ -292,7 +301,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
for(int i=0; i < 4; i++)
{
@@ -301,7 +310,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
verifyQueueMetrics(a, 1*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -335,7 +344,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -363,7 +372,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -390,7 +399,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
@@ -405,12 +414,14 @@ public class TestChildQueueOrder {
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
root.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
InOrder allocationOrder = inOrder(d,b);
- allocationOrder.verify(d).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class));
- allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), any(ResourceLimits.class));
+ allocationOrder.verify(d).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(SchedulingMode.class));
+ allocationOrder.verify(b).assignContainers(eq(clusterResource),
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class),
+ any(SchedulingMode.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index 03b8f5c..54ba617 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -32,6 +34,7 @@ import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.LogAggregationContext;
import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.NodeLabel;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -51,9 +54,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.Assert;
@@ -327,387 +334,4 @@ public class TestContainerAllocation {
rm1.waitForState(attempt.getAppAttemptId(), RMAppAttemptState.ALLOCATED);
MockRM.launchAndRegisterAM(app1, rm1, nm1);
}
-
- private Configuration getConfigurationWithQueueLabels(Configuration config) {
- CapacitySchedulerConfiguration conf =
- new CapacitySchedulerConfiguration(config);
-
- // Define top-level queues
- conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
- conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-
- final String A = CapacitySchedulerConfiguration.ROOT + ".a";
- conf.setCapacity(A, 10);
- conf.setMaximumCapacity(A, 15);
- conf.setAccessibleNodeLabels(A, toSet("x"));
- conf.setCapacityByLabel(A, "x", 100);
-
- final String B = CapacitySchedulerConfiguration.ROOT + ".b";
- conf.setCapacity(B, 20);
- conf.setAccessibleNodeLabels(B, toSet("y"));
- conf.setCapacityByLabel(B, "y", 100);
-
- final String C = CapacitySchedulerConfiguration.ROOT + ".c";
- conf.setCapacity(C, 70);
- conf.setMaximumCapacity(C, 70);
- conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
-
- // Define 2nd-level queues
- final String A1 = A + ".a1";
- conf.setQueues(A, new String[] {"a1"});
- conf.setCapacity(A1, 100);
- conf.setMaximumCapacity(A1, 100);
- conf.setCapacityByLabel(A1, "x", 100);
-
- final String B1 = B + ".b1";
- conf.setQueues(B, new String[] {"b1"});
- conf.setCapacity(B1, 100);
- conf.setMaximumCapacity(B1, 100);
- conf.setCapacityByLabel(B1, "y", 100);
-
- final String C1 = C + ".c1";
- conf.setQueues(C, new String[] {"c1"});
- conf.setCapacity(C1, 100);
- conf.setMaximumCapacity(C1, 100);
-
- return conf;
- }
-
- private void checkTaskContainersHost(ApplicationAttemptId attemptId,
- ContainerId containerId, ResourceManager rm, String host) {
- YarnScheduler scheduler = rm.getRMContext().getScheduler();
- SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
-
- Assert.assertTrue(appReport.getLiveContainers().size() > 0);
- for (RMContainer c : appReport.getLiveContainers()) {
- if (c.getContainerId().equals(containerId)) {
- Assert.assertEquals(host, c.getAllocatedNode().getHost());
- }
- }
- }
-
- @SuppressWarnings("unchecked")
- private <E> Set<E> toSet(E... elements) {
- Set<E> set = Sets.newHashSet(elements);
- return set;
- }
-
- @Test (timeout = 300000)
- public void testContainerAllocationWithSingleUserLimits() throws Exception {
- final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
- mgr.init(conf);
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // A has only 10% of x, so it can only allocate one container in label=empty
- ContainerId containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- // Cannot allocate 2nd label=empty container
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "");
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
-
- // A has default user limit = 100, so it can use all resource in label = x
- // We can allocate floor(8000 / 1024) = 7 containers
- for (int id = 3; id <= 8; id++) {
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), id);
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- }
- rm1.close();
- }
-
- @Test(timeout = 300000)
- public void testContainerAllocateWithComplexLabels() throws Exception {
- /*
- * Queue structure:
- * root (*)
- * ________________
- * / \
- * a x(100%), y(50%) b y(50%), z(100%)
- * ________________ ______________
- * / / \
- * a1 (x,y) b1(no) b2(y,z)
- * 100% y = 100%, z = 100%
- *
- * Node structure:
- * h1 : x
- * h2 : y
- * h3 : y
- * h4 : z
- * h5 : NO
- *
- * Total resource:
- * x: 4G
- * y: 6G
- * z: 2G
- * *: 2G
- *
- * Resource of
- * a1: x=4G, y=3G, NO=0.2G
- * b1: NO=0.9G (max=1G)
- * b2: y=3, z=2G, NO=0.9G (max=1G)
- *
- * Each node can only allocate two containers
- */
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y", "z"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
- toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
- NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
- toSet("z"), NodeId.newInstance("h5", 0),
- RMNodeLabelsManager.EMPTY_STRING_SET));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 2048);
- MockNM nm2 = rm1.registerNode("h2:1234", 2048);
- MockNM nm3 = rm1.registerNode("h3:1234", 2048);
- MockNM nm4 = rm1.registerNode("h4:1234", 2048);
- MockNM nm5 = rm1.registerNode("h5:1234", 2048);
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(1024, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // request a container (label = y). can be allocated on nm2
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h5
- RMApp app2 = rm1.submitApp(1024, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
-
- // request a container for AM, will succeed
- // and now b1's queue capacity will be used, cannot allocate more containers
- // (Maximum capacity reached)
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm4, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertFalse(rm1.waitForState(nm5, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
-
- // launch an app to queue b2
- RMApp app3 = rm1.submitApp(1024, "app", "user", null, "b2");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
-
- // request a container. try to allocate on nm1 (label = x) and nm3 (label =
- // y,z). Will successfully allocate on nm3
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- // try to allocate container (request label = z) on nm4 (label = y,z).
- // Will successfully allocate on nm4 only.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
- Assert.assertTrue(rm1.waitForState(nm4, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h4");
-
- rm1.close();
- }
-
- @Test (timeout = 120000)
- public void testContainerAllocateWithLabels() throws Exception {
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(getConfigurationWithQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm3);
-
- // request a container.
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "x");
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h1");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h2
- RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm3);
-
- // request a container.
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue c1 (label = ""), and check all container will
- // be allocated in h3
- RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
- // request a container.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- rm1.close();
- }
-
- @Test (timeout = 120000)
- public void testContainerAllocateWithDefaultQueueLabels() throws Exception {
- // This test is pretty much similar to testContainerAllocateWithLabel.
- // Difference is, this test doesn't specify label expression in ResourceRequest,
- // instead, it uses default queue label expression
-
- // set node -> label
- mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
- mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x"),
- NodeId.newInstance("h2", 0), toSet("y")));
-
- // inject node label manager
- MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
- @Override
- public RMNodeLabelsManager createNodeLabelManager() {
- return mgr;
- }
- };
-
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8000); // label = x
- MockNM nm2 = rm1.registerNode("h2:1234", 8000); // label = y
- MockNM nm3 = rm1.registerNode("h3:1234", 8000); // label = <empty>
-
- ContainerId containerId;
-
- // launch an app to queue a1 (label = x), and check all container will
- // be allocated in h1
- RMApp app1 = rm1.submitApp(200, "app", "user", null, "a1");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- // request a container.
- am1.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId =
- ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm1, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
- "h1");
-
- // launch an app to queue b1 (label = y), and check all container will
- // be allocated in h2
- RMApp app2 = rm1.submitApp(200, "app", "user", null, "b1");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // request a container.
- am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am2.getApplicationAttemptId(), containerId, rm1,
- "h2");
-
- // launch an app to queue c1 (label = ""), and check all container will
- // be allocated in h3
- RMApp app3 = rm1.submitApp(200, "app", "user", null, "c1");
- MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm3);
-
- // request a container.
- am3.allocate("*", 1024, 1, new ArrayList<ContainerId>());
- containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
- Assert.assertFalse(rm1.waitForState(nm2, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- Assert.assertTrue(rm1.waitForState(nm3, containerId,
- RMContainerState.ALLOCATED, 10 * 1000));
- checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
- "h3");
-
- rm1.close();
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/0fefda64/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 972cabb..0b5250b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -351,7 +351,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(
(int)(node_0.getTotalResource().getMemory() * a.getCapacity()) - (1*GB),
a.getMetrics().getAvailableMB());
@@ -487,7 +487,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -498,7 +498,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -507,7 +507,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -517,7 +517,7 @@ public class TestLeafQueue {
// Bump up user-limit-factor, now allocate should work
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -526,7 +526,7 @@ public class TestLeafQueue {
// One more should work, for app_1, due to user-limit-factor
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -537,7 +537,7 @@ public class TestLeafQueue {
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0, new ResourceLimits(
- clusterResource));
+ clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -653,21 +653,21 @@ public class TestLeafQueue {
// 1 container to user_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
// One more to user_0 since he is the only active user
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
@@ -719,10 +719,10 @@ public class TestLeafQueue {
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//maxqueue 16G, userlimit 13G, - 4G used = 9G
assertEquals(9*GB,app_0.getHeadroom().getMemory());
@@ -739,10 +739,10 @@ public class TestLeafQueue {
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource, app_0
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, qb.getUsedResources().getMemory());
assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
@@ -782,12 +782,12 @@ public class TestLeafQueue {
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, qb.getUsedResources().getMemory());
//maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
assertEquals(5*GB, app_3.getHeadroom().getMemory());
@@ -803,13 +803,13 @@ public class TestLeafQueue {
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
qb.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource, app_4
.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource, app_3
.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability(),
- null);
+ "", SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
//app3 is user1, active from last test case
@@ -876,7 +876,7 @@ public class TestLeafQueue {
priority, recordFactory)));
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -893,7 +893,7 @@ public class TestLeafQueue {
priority, recordFactory)));
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -982,7 +982,7 @@ public class TestLeafQueue {
// 1 container to user_0
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -993,7 +993,7 @@ public class TestLeafQueue {
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1010,7 +1010,7 @@ public class TestLeafQueue {
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
@@ -1024,7 +1024,7 @@ public class TestLeafQueue {
priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
}
@@ -1095,7 +1095,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1103,7 +1103,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1111,7 +1111,7 @@ public class TestLeafQueue {
// Can't allocate 3rd due to user-limit
a.setUserLimit(25);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1130,7 +1130,7 @@ public class TestLeafQueue {
// user_0 is at limit inspite of high user-limit-factor
a.setUserLimitFactor(10);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1140,7 +1140,7 @@ public class TestLeafQueue {
// Now allocations should goto app_0 since
// user_0 is at user-limit not above it
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1151,7 +1151,7 @@ public class TestLeafQueue {
// Now - no more allocs since we are at max-cap
a.setMaxCapacity(0.5f);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1163,7 +1163,7 @@ public class TestLeafQueue {
a.setMaxCapacity(1.0f);
a.setUserLimitFactor(1);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(7*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1172,7 +1172,7 @@ public class TestLeafQueue {
// Now we should assign to app_3 again since user_2 is under user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(3*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1272,7 +1272,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1283,7 +1283,7 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1292,7 +1292,7 @@ public class TestLeafQueue {
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1309,7 +1309,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1326,7 +1326,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(4*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1394,7 +1394,7 @@ public class TestLeafQueue {
// Start testing...
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1404,7 +1404,7 @@ public class TestLeafQueue {
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1418,7 +1418,7 @@ public class TestLeafQueue {
doReturn(-1).when(a).getNodeLocalityDelay();
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(10*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1435,7 +1435,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_1.getCurrentConsumption().getMemory());
@@ -1504,7 +1504,7 @@ public class TestLeafQueue {
// Only 1 container
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1512,14 +1512,14 @@ public class TestLeafQueue {
// Also 2nd -> minCapacity = 1024 since (.1 * 8G) < minAlloc, also
// you can get one container more than user-limit
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
// Now, reservation should kick in for app_1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(6*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1534,7 +1534,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1544,7 +1544,7 @@ public class TestLeafQueue {
// Re-reserve
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(5*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
@@ -1554,7 +1554,7 @@ public class TestLeafQueue {
// Try to schedule on node_1 now, should *move* the reservation
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(9*GB, a.getUsedResources().getMemory());
assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1572,7 +1572,7 @@ public class TestLeafQueue {
ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
RMContainerEventType.KILL, null, true);
CSAssignment assignment = a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(8*GB, a.getUsedResources().getMemory());
assertEquals(0*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(4*GB, app_1.getCurrentConsumption().getMemory());
@@ -1644,7 +1644,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1653,7 +1653,7 @@ public class TestLeafQueue {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority));
@@ -1662,7 +1662,7 @@ public class TestLeafQueue {
// Another off switch, shouldn't allocate due to delay scheduling
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority));
@@ -1672,7 +1672,7 @@ public class TestLeafQueue {
// Another off switch, now we should allocate
// since missedOpportunities=3 and reqdContainers=3
assignment = a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(4, app_0.getSchedulingOpportunities(priority)); // should NOT reset
@@ -1681,7 +1681,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_0
assignment = a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1690,7 +1690,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_1
assignment = a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1719,14 +1719,14 @@ public class TestLeafQueue {
// Shouldn't assign RACK_LOCAL yet
assignment = a.assignContainers(clusterResource, node_3,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertEquals(1, app_0.getSchedulingOpportunities(priority));
assertEquals(2, app_0.getTotalRequiredResources(priority));
assertEquals(NodeType.NODE_LOCAL, assignment.getType()); // None->NODE_LOCAL
// Should assign RACK_LOCAL now
assignment = a.assignContainers(clusterResource, node_3,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.RACK_LOCAL), eq(node_3),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1808,7 +1808,7 @@ public class TestLeafQueue {
// Start with off switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority_1));
@@ -1821,7 +1821,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate P1 due to delay scheduling
// thus, no P2 either!
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(2, app_0.getSchedulingOpportunities(priority_1));
@@ -1833,7 +1833,7 @@ public class TestLeafQueue {
// Another off-switch, shouldn't allocate OFF_SWITCH P1
a.assignContainers(clusterResource, node_2,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.OFF_SWITCH), eq(node_2),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(3, app_0.getSchedulingOpportunities(priority_1));
@@ -1845,7 +1845,7 @@ public class TestLeafQueue {
// Now, DATA_LOCAL for P1
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1857,7 +1857,7 @@ public class TestLeafQueue {
// Now, OFF_SWITCH for P2
a.assignContainers(clusterResource, node_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1),
eq(priority_1), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority_1));
@@ -1934,7 +1934,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_0_1
a.assignContainers(clusterResource, node_0_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_0_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -1943,7 +1943,7 @@ public class TestLeafQueue {
// No allocation on node_1_0 even though it's node/rack local since
// required(ANY) == 0
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // Still zero
@@ -1960,7 +1960,7 @@ public class TestLeafQueue {
// No allocation on node_0_1 even though it's node/rack local since
// required(rack_1) == 0
a.assignContainers(clusterResource, node_0_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(1, app_0.getSchedulingOpportunities(priority));
@@ -1968,7 +1968,7 @@ public class TestLeafQueue {
// NODE_LOCAL - node_1
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should reset
@@ -2221,7 +2221,7 @@ public class TestLeafQueue {
// node_0_1
// Shouldn't allocate since RR(rack_0) = null && RR(ANY) = relax: false
a.assignContainers(clusterResource, node_0_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2244,7 +2244,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since RR(rack_1) = relax: false
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_0_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2275,7 +2275,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since node_1_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2304,7 +2304,7 @@ public class TestLeafQueue {
// node_1_1
// Shouldn't allocate since rack_1 is blacklisted
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0, never()).allocate(any(NodeType.class), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority)); // should be 0
@@ -2331,7 +2331,7 @@ public class TestLeafQueue {
// Now, should allocate since RR(rack_1) = relax: true
a.assignContainers(clusterResource, node_1_1,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0,never()).allocate(eq(NodeType.RACK_LOCAL), eq(node_1_1),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@@ -2362,7 +2362,7 @@ public class TestLeafQueue {
// host_1_1: 7G
a.assignContainers(clusterResource, node_1_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
verify(app_0).allocate(eq(NodeType.NODE_LOCAL), eq(node_1_0),
any(Priority.class), any(ResourceRequest.class), any(Container.class));
assertEquals(0, app_0.getSchedulingOpportunities(priority));
@@ -2445,7 +2445,7 @@ public class TestLeafQueue {
try {
a.assignContainers(clusterResource, node_0,
- new ResourceLimits(clusterResource));
+ new ResourceLimits(clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
} catch (NullPointerException e) {
Assert.fail("NPE when allocating container on node but "
+ "forget to set off-switch request should be handled");