You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/09 20:39:39 UTC
[1/4] hadoop git commit: YARN-4390. Do surgical preemption based on
reserved container in CapacityScheduler. Contributed by Wangda Tan
Repository: hadoop
Updated Branches:
refs/heads/branch-2.8 bced9e2e5 -> 40367c8da
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
new file mode 100644
index 0000000..5067412
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -0,0 +1,246 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+
+public class TestCapacitySchedulerSurgicalPreemption
+ extends CapacitySchedulerPreemptionTestBase {
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+ true);
+ }
+
+ @Test(timeout = 60000)
+ public void testSimpleSurgicalPreemption()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+ *
+ * 2) app1 submit to queue-a first, it asked 32 * 1G containers
+ * We will allocate 16 on n1 and 16 on n2.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * 4) app2 asks for another 6G container, it will be reserved on n1
+ *
+ * Now: we have:
+ * n1: 17 from app1, 1 from app2, and 1 reserved from app2
+ * n2: 16 from app1.
+ *
+ * After preemption, we should expect:
+ * Preempt 4 containers from app1 on n1.
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 32, new ArrayList<ContainerId>());
+
+ // Do allocation for node1/node2
+ for (int i = 0; i < 32; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 33 containers now
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(33, schedulerApp1.getLiveContainers().size());
+ // 17 from n1 and 16 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 17);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 16);
+
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1/NM2 has available resource = 2G/4G
+ Assert.assertEquals(2 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(4 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(6 * GB), 1)), null);
+
+ // Call allocation once on n1, we should expect the container reserved on n1
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ Assert.assertNotNull(cs.getNode(nm1.getNodeId()).getReservedContainer());
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if 4 containers from app1 at n1 killed
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 29);
+
+ // 13 from n1 (4 preempted) and 16 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 13);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 16);
+
+ rm1.close();
+ }
+
+ @Test(timeout = 60000)
+ public void testSurgicalPreemptionWithAvailableResource()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+ *
+ * 2) app1 submit to queue-a first, it asked 38 * 1G containers
+ * We will allocate 20 on n1 and 19 on n2.
+ *
+ * 3) app2 submit to queue-c, ask for one 4G container (for AM)
+ *
+ * After preemption, we should expect:
+ * Preempt 3 containers from app1 and AM of app2 successfully allocated.
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
+
+ // Do allocation for node1/node2
+ for (int i = 0; i < 38; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 31 containers now
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+ // 17 from n1 and 16 from n2
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+ am1.getApplicationAttemptId(), 20);
+ waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+ am1.getApplicationAttemptId(), 19);
+
+
+ // Submit app2 to queue-c and asks for a 4G container for AM
+ RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c");
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+ // Call editSchedule: containers are selected to be preemption candidate
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+ editPolicy.editSchedule();
+ Assert.assertEquals(3, editPolicy.getToPreemptContainers().size());
+
+ // Call editSchedule again: selected containers are killed
+ editPolicy.editSchedule();
+ waitNumberOfLiveContainersFromApp(schedulerApp1, 36);
+
+ // Call allocation, containers are reserved
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+ // Call editSchedule twice and allocation once, container should get allocated
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ int tick = 0;
+ while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ tick++;
+ Thread.sleep(100);
+ }
+ waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
+
+ rm1.close();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
index 0d09e8f..10f6c2b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
@@ -5329,7 +5329,7 @@ public class TestFairScheduler extends FairSchedulerTestBase {
resourceManager
.getResourceScheduler()
.getSchedulerNode(resourceEvent.getNodeId())
- .setTotalResource(resourceEvent.getResourceOption().getResource());
+ .updateTotalResource(resourceEvent.getResourceOption().getResource());
}
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index 3167726..d9ab823 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -1202,7 +1202,7 @@ public class TestFifoScheduler {
resourceManager
.getResourceScheduler()
.getSchedulerNode(resourceEvent.getNodeId())
- .setTotalResource(resourceEvent.getResourceOption().getResource());
+ .updateTotalResource(resourceEvent.getResourceOption().getResource());
}
}
});
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[3/4] hadoop git commit: YARN-4390. Do surgical preemption based on
reserved container in CapacityScheduler. Contributed by Wangda Tan
Posted by ep...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..e60c384
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,687 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.event.Dispatcher;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.mockito.ArgumentMatcher;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class ProportionalCapacityPreemptionPolicyMockFramework {
+ static final Log LOG =
+ LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+ final String ROOT = CapacitySchedulerConfiguration.ROOT;
+
+ Map<String, CSQueue> nameToCSQueues = null;
+ Map<String, Resource> partitionToResource = null;
+ Map<NodeId, FiCaSchedulerNode> nodeIdToSchedulerNodes = null;
+ RMNodeLabelsManager nlm = null;
+ RMContext rmContext = null;
+
+ ResourceCalculator rc = new DefaultResourceCalculator();
+ Clock mClock = null;
+ CapacitySchedulerConfiguration conf = null;
+ CapacityScheduler cs = null;
+ EventHandler<SchedulerEvent> mDisp = null;
+ ProportionalCapacityPreemptionPolicy policy = null;
+ Resource clusterResource = null;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setup() {
+ org.apache.log4j.Logger.getRootLogger().setLevel(
+ org.apache.log4j.Level.DEBUG);
+
+ conf = new CapacitySchedulerConfiguration(new Configuration(false));
+ conf.setLong(
+ CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+ 3000);
+ // report "ideal" preempt
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ (float) 1.0);
+ conf.setFloat(
+ CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+ (float) 1.0);
+
+ mClock = mock(Clock.class);
+ cs = mock(CapacityScheduler.class);
+ when(cs.getResourceCalculator()).thenReturn(rc);
+ when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
+ when(cs.getConfiguration()).thenReturn(conf);
+
+ nlm = mock(RMNodeLabelsManager.class);
+ mDisp = mock(EventHandler.class);
+
+ rmContext = mock(RMContext.class);
+ when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+ Dispatcher disp = mock(Dispatcher.class);
+ when(rmContext.getDispatcher()).thenReturn(disp);
+ when(disp.getEventHandler()).thenReturn(mDisp);
+ when(cs.getRMContext()).thenReturn(rmContext);
+
+ partitionToResource = new HashMap<>();
+ nodeIdToSchedulerNodes = new HashMap<>();
+ nameToCSQueues = new HashMap<>();
+ }
+
+ public void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig) throws IOException {
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
+ }
+
+ public void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig,
+ boolean useDominantResourceCalculator) throws IOException {
+ if (useDominantResourceCalculator) {
+ when(cs.getResourceCalculator()).thenReturn(
+ new DominantResourceCalculator());
+ }
+ mockNodeLabelsManager(labelsConfig);
+ mockSchedulerNodes(nodesConfig);
+ for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+ when(cs.getSchedulerNode(nodeId)).thenReturn(
+ nodeIdToSchedulerNodes.get(nodeId));
+ }
+ when(cs.getAllNodes()).thenReturn(nodeIdToSchedulerNodes);
+ ParentQueue root = mockQueueHierarchy(queuesConfig);
+ when(cs.getRootQueue()).thenReturn(root);
+ when(cs.getClusterResource()).thenReturn(clusterResource);
+ mockApplications(appsConfig);
+
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
+ mClock);
+ }
+
+ private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
+ String queueName, List<RMContainer> reservedContainers,
+ List<RMContainer> liveContainers) {
+ int containerId = 1;
+ int start = containersConfig.indexOf("=") + 1;
+ int end = -1;
+
+ while (start < containersConfig.length()) {
+ while (start < containersConfig.length()
+ && containersConfig.charAt(start) != '(') {
+ start++;
+ }
+ if (start >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error containers specification, line=" + containersConfig);
+ }
+ end = start + 1;
+ while (end < containersConfig.length()
+ && containersConfig.charAt(end) != ')') {
+ end++;
+ }
+ if (end >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error containers specification, line=" + containersConfig);
+ }
+
+ // now we found start/end, get container values
+ String[] values = containersConfig.substring(start + 1, end).split(",");
+ if (values.length != 6) {
+ throw new IllegalArgumentException("Format to define container is:"
+ + "(priority,resource,host,expression,repeat,reserved)");
+ }
+ Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+ Resource res = parseResourceFromString(values[1]);
+ NodeId host = NodeId.newInstance(values[2], 1);
+ String exp = values[3];
+ int repeat = Integer.valueOf(values[4]);
+ boolean reserved = Boolean.valueOf(values[5]);
+
+ for (int i = 0; i < repeat; i++) {
+ Container c = mock(Container.class);
+ when(c.getResource()).thenReturn(res);
+ when(c.getPriority()).thenReturn(pri);
+ RMContainerImpl rmc = mock(RMContainerImpl.class);
+ when(rmc.getAllocatedNode()).thenReturn(host);
+ when(rmc.getNodeLabelExpression()).thenReturn(exp);
+ when(rmc.getAllocatedResource()).thenReturn(res);
+ when(rmc.getContainer()).thenReturn(c);
+ when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
+ when(rmc.getQueueName()).thenReturn(queueName);
+ final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
+ when(rmc.getContainerId()).thenReturn(
+ cId);
+ doAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ return cId.compareTo(((RMContainer) invocation.getArguments()[0])
+ .getContainerId());
+ }
+ }).when(rmc).compareTo(any(RMContainer.class));
+
+ if (containerId == 1) {
+ when(rmc.isAMContainer()).thenReturn(true);
+ }
+
+ if (reserved) {
+ reservedContainers.add(rmc);
+ when(rmc.getReservedResource()).thenReturn(res);
+ } else {
+ liveContainers.add(rmc);
+ }
+
+ // Add container to scheduler-node
+ addContainerToSchedulerNode(host, rmc, reserved);
+
+ // If this is a non-exclusive allocation
+ String partition = null;
+ if (exp.isEmpty()
+ && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
+ .isEmpty()) {
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
+ queue.getIgnoreExclusivityRMContainers();
+ if (!ignoreExclusivityContainers.containsKey(partition)) {
+ ignoreExclusivityContainers.put(partition,
+ new TreeSet<RMContainer>());
+ }
+ ignoreExclusivityContainers.get(partition).add(rmc);
+ }
+ LOG.debug("add container to app=" + attemptId + " res=" + res
+ + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+ + partition);
+
+ containerId++;
+ }
+
+ start = end + 1;
+ }
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * queueName\t // app1
+ * (priority,resource,host,expression,#repeat,reserved)
+ * (priority,resource,host,expression,#repeat,reserved);
+ * queueName\t // app2
+ * </pre>
+ */
+ private void mockApplications(String appsConfig) {
+ int id = 1;
+ for (String a : appsConfig.split(";")) {
+ String[] strs = a.split("\t");
+ String queueName = strs[0];
+
+ // get containers
+ List<RMContainer> liveContainers = new ArrayList<RMContainer>();
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ ApplicationId appId = ApplicationId.newInstance(0L, id);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+ liveContainers);
+
+ FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+ when(app.getLiveContainers()).thenReturn(liveContainers);
+ when(app.getReservedContainers()).thenReturn(reservedContainers);
+ when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(app.getApplicationId()).thenReturn(appId);
+ when(app.getPriority()).thenReturn(Priority.newInstance(0));
+
+ // add to LeafQueue
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ queue.getApplications().add(app);
+
+ id++;
+ }
+ }
+
+ private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
+ boolean isReserved) {
+ SchedulerNode node = nodeIdToSchedulerNodes.get(nodeId);
+ assert node != null;
+
+ if (isReserved) {
+ when(node.getReservedContainer()).thenReturn(container);
+ } else {
+ node.getCopiedListOfRunningContainers().add(container);
+ Resources.subtractFrom(node.getAvailableResource(),
+ container.getAllocatedResource());
+ }
+ }
+
+ /**
+ * Format is:
+ * host1=partition[ res=resource];
+ * host2=partition[ res=resource];
+ */
+ private void mockSchedulerNodes(String schedulerNodesConfigStr)
+ throws IOException {
+ String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
+ for (String p : nodesConfigStrArray) {
+ String[] arr = p.split(" ");
+
+ NodeId nodeId = NodeId.newInstance(arr[0].substring(0, arr[0].indexOf("=")), 1);
+ String partition = arr[0].substring(arr[0].indexOf("=") + 1, arr[0].length());
+
+ FiCaSchedulerNode sn = mock(FiCaSchedulerNode.class);
+ when(sn.getNodeID()).thenReturn(nodeId);
+ when(sn.getPartition()).thenReturn(partition);
+
+ Resource totalRes = Resources.createResource(0);
+ if (arr.length > 1) {
+ String res = arr[1];
+ if (res.contains("res=")) {
+ String resSring = res.substring(
+ res.indexOf("res=") + "res=".length());
+ totalRes = parseResourceFromString(resSring);
+ }
+ }
+ when(sn.getTotalResource()).thenReturn(totalRes);
+ when(sn.getAvailableResource()).thenReturn(Resources.clone(totalRes));
+
+ // TODO, add settings of killable resources when necessary
+ when(sn.getTotalKillableResources()).thenReturn(Resources.none());
+
+ List<RMContainer> liveContainers = new ArrayList<>();
+ when(sn.getCopiedListOfRunningContainers()).thenReturn(liveContainers);
+
+ nodeIdToSchedulerNodes.put(nodeId, sn);
+
+ LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+ }
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * partition0=total_resource,exclusivity;
+ * partition1=total_resource,exclusivity;
+ * ...
+ * </pre>
+ */
+ private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
+ String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
+ clusterResource = Resources.createResource(0);
+ for (String p : partitionConfigArr) {
+ String partitionName = p.substring(0, p.indexOf("="));
+ Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
+ p.indexOf(",")));
+ boolean exclusivity =
+ Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
+ when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
+ .thenReturn(res);
+ when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+ // add to partition to resource
+ partitionToResource.put(partitionName, res);
+ LOG.debug("add partition=" + partitionName + " totalRes=" + res
+ + " exclusivity=" + exclusivity);
+ Resources.addTo(clusterResource, res);
+ }
+
+ when(nlm.getClusterNodeLabelNames()).thenReturn(
+ partitionToResource.keySet());
+ }
+
+ private Resource parseResourceFromString(String p) {
+ String[] resource = p.split(":");
+ Resource res;
+ if (resource.length == 1) {
+ res = Resources.createResource(Integer.valueOf(resource[0]));
+ } else {
+ res = Resources.createResource(Integer.valueOf(resource[0]),
+ Integer.valueOf(resource[1]));
+ }
+ return res;
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * root (<partition-name-1>=[guaranteed max used pending (reserved)],<partition-name-2>=..);
+ * -A(...);
+ * --A1(...);
+ * --A2(...);
+ * -B...
+ * </pre>
+ * ";" splits queues, and there should no empty lines, no extra spaces
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private ParentQueue mockQueueHierarchy(String queueExprs) {
+ String[] queueExprArray = queueExprs.split(";");
+ ParentQueue rootQueue = null;
+ for (int idx = 0; idx < queueExprArray.length; idx++) {
+ String q = queueExprArray[idx];
+ CSQueue queue;
+
+ // Initialize queue
+ if (isParent(queueExprArray, idx)) {
+ ParentQueue parentQueue = mock(ParentQueue.class);
+ queue = parentQueue;
+ List<CSQueue> children = new ArrayList<CSQueue>();
+ when(parentQueue.getChildQueues()).thenReturn(children);
+ } else {
+ LeafQueue leafQueue = mock(LeafQueue.class);
+ final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
+ new Comparator<FiCaSchedulerApp>() {
+ @Override
+ public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
+ return a1.getApplicationId().compareTo(a2.getApplicationId());
+ }
+ });
+ when(leafQueue.getApplications()).thenReturn(apps);
+ OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+ when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ return apps.descendingIterator();
+ }
+ });
+ when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+ Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+ new HashMap<>();
+ when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+ ignorePartitionContainers);
+ queue = leafQueue;
+ }
+
+ setupQueue(queue, q, queueExprArray, idx);
+ if (queue.getQueueName().equals(ROOT)) {
+ rootQueue = (ParentQueue) queue;
+ }
+ }
+ return rootQueue;
+ }
+
+ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+ int idx) {
+ LOG.debug("*** Setup queue, source=" + q);
+ String queuePath = null;
+
+ int myLevel = getLevel(q);
+ if (0 == myLevel) {
+ // It's root
+ when(queue.getQueueName()).thenReturn(ROOT);
+ queuePath = ROOT;
+ }
+
+ String queueName = getQueueName(q);
+ when(queue.getQueueName()).thenReturn(queueName);
+
+ // Setup parent queue, and add myself to parentQueue.children-list
+ ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+ if (null != parentQueue) {
+ when(queue.getParent()).thenReturn(parentQueue);
+ parentQueue.getChildQueues().add(queue);
+
+ // Setup my path
+ queuePath = parentQueue.getQueuePath() + "." + queueName;
+ }
+ when(queue.getQueuePath()).thenReturn(queuePath);
+
+ QueueCapacities qc = new QueueCapacities(0 == myLevel);
+ ResourceUsage ru = new ResourceUsage();
+
+ when(queue.getQueueCapacities()).thenReturn(qc);
+ when(queue.getQueueResourceUsage()).thenReturn(ru);
+
+ LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+ + queue.getQueuePath());
+ LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+ .getQueueName()));
+
+ // Setup other fields like used resource, guaranteed resource, etc.
+ String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+ for (String s : capacitySettingStr.split(",")) {
+ String partitionName = s.substring(0, s.indexOf("="));
+ String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+ // Add a small epsilon to capacities to avoid truncate when doing
+ // Resources.multiply
+ float epsilon = 1e-6f;
+ Resource totResoucePerPartition = partitionToResource.get(partitionName);
+ float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[0].trim()), totResoucePerPartition)
+ + epsilon;
+ float absMax = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[1].trim()), totResoucePerPartition)
+ + epsilon;
+ float absUsed = Resources.divide(rc, totResoucePerPartition,
+ parseResourceFromString(values[2].trim()), totResoucePerPartition)
+ + epsilon;
+ Resource pending = parseResourceFromString(values[3].trim());
+ qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+ qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+ qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+ ru.setPending(partitionName, pending);
+ if (!isParent(queueExprArray, idx)) {
+ LeafQueue lq = (LeafQueue) queue;
+ when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
+ isA(String.class))).thenReturn(pending);
+ }
+ ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
+
+ // Setup reserved resource if it contained by input config
+ Resource reserved = Resources.none();
+ if(values.length == 5) {
+ reserved = parseResourceFromString(values[4].trim());
+ ru.setReserved(partitionName, reserved);
+ }
+ LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ + ",abs_used" + absUsed + ",pending_resource=" + pending
+ + ", reserved_resource=" + reserved + "]");
+ }
+
+ // Setup preemption disabled
+ when(queue.getPreemptionDisabled()).thenReturn(
+ conf.getPreemptionDisabled(queuePath, false));
+
+ nameToCSQueues.put(queueName, queue);
+ when(cs.getQueue(eq(queueName))).thenReturn(queue);
+ }
+
+ /**
+ * Level of a queue is how many "-" at beginning, root's level is 0
+ */
+ private int getLevel(String q) {
+ int level = 0; // level = how many "-" at beginning
+ while (level < q.length() && q.charAt(level) == '-') {
+ level++;
+ }
+ return level;
+ }
+
+ private String getQueueName(String q) {
+ int idx = 0;
+ // find first != '-' char
+ while (idx < q.length() && q.charAt(idx) == '-') {
+ idx++;
+ }
+ if (idx == q.length()) {
+ throw new IllegalArgumentException("illegal input:" + q);
+ }
+ // name = after '-' and before '('
+ String name = q.substring(idx, q.indexOf('('));
+ if (name.isEmpty()) {
+ throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+ }
+ if (name.contains(".")) {
+ throw new IllegalArgumentException("queue name shouldn't contain '.':"
+ + name);
+ }
+ return name;
+ }
+
+ private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+ idx--;
+ while (idx >= 0) {
+ int level = getLevel(queueExprArray[idx]);
+ if (level < myLevel) {
+ String parentQueuName = getQueueName(queueExprArray[idx]);
+ return (ParentQueue) nameToCSQueues.get(parentQueuName);
+ }
+ idx--;
+ }
+
+ return null;
+ }
+
+ /**
+ * Get if a queue is ParentQueue
+ */
+ private boolean isParent(String[] queues, int idx) {
+ int myLevel = getLevel(queues[idx]);
+ idx++;
+ while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+ idx++;
+ }
+ if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+ // It's a LeafQueue
+ return false;
+ } else {
+ return true;
+ }
+ }
+
+ public ApplicationAttemptId getAppAttemptId(int id) {
+ ApplicationId appId = ApplicationId.newInstance(0L, id);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ return appAttemptId;
+ }
+
+ public void checkContainerNodesInApp(FiCaSchedulerApp app,
+ int expectedContainersNumber, String host) {
+ NodeId nodeId = NodeId.newInstance(host, 1);
+ int num = 0;
+ for (RMContainer c : app.getLiveContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ for (RMContainer c : app.getReservedContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ Assert.assertEquals(expectedContainersNumber, num);
+ }
+
+ public FiCaSchedulerApp getApp(String queueName, int appId) {
+ for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+ .getApplications()) {
+ if (app.getApplicationId().getId() == appId) {
+ return app;
+ }
+ }
+ return null;
+ }
+
+ public void checkAbsCapacities(CSQueue queue, String partition,
+ float guaranteed, float max, float used) {
+ QueueCapacities qc = queue.getQueueCapacities();
+ Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
+ Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
+ Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
+ }
+
+ public void checkPendingResource(CSQueue queue, String partition, int pending) {
+ ResourceUsage ru = queue.getQueueResourceUsage();
+ Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
+ }
+
+ public void checkReservedResource(CSQueue queue, String partition, int reserved) {
+ ResourceUsage ru = queue.getQueueResourceUsage();
+ Assert.assertEquals(reserved, ru.getReserved(partition).getMemorySize());
+ }
+
+ static class IsPreemptionRequestForQueueAndNode
+ extends ArgumentMatcher<ContainerPreemptEvent> {
+ private final ApplicationAttemptId appAttId;
+ private final String queueName;
+ private final NodeId nodeId;
+
+ IsPreemptionRequestForQueueAndNode(ApplicationAttemptId appAttId,
+ String queueName, NodeId nodeId) {
+ this.appAttId = appAttId;
+ this.queueName = queueName;
+ this.nodeId = nodeId;
+ }
+ @Override
+ public boolean matches(Object o) {
+ ContainerPreemptEvent cpe = (ContainerPreemptEvent)o;
+
+ return appAttId.equals(cpe.getAppId())
+ && queueName.equals(cpe.getContainer().getQueueName())
+ && nodeId.equals(cpe.getContainer().getAllocatedNode());
+ }
+ @Override
+ public String toString() {
+ return appAttId.toString();
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index f4fcfb9..13167b6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -62,6 +62,7 @@ import org.mockito.ArgumentMatcher;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
@@ -161,6 +162,11 @@ public class TestProportionalCapacityPreemptionPolicy {
mCS = mock(CapacityScheduler.class);
when(mCS.getResourceCalculator()).thenReturn(rc);
lm = mock(RMNodeLabelsManager.class);
+ try {
+ when(lm.isExclusiveNodeLabel(anyString())).thenReturn(true);
+ } catch (IOException e) {
+ // do nothing
+ }
when(mCS.getConfiguration()).thenReturn(conf);
rmContext = mock(RMContext.class);
when(mCS.getRMContext()).thenReturn(rmContext);
@@ -650,6 +656,26 @@ public class TestProportionalCapacityPreemptionPolicy {
}
@Test
+ public void testHierarchicalWithReserved() {
+ int[][] qData = new int[][] {
+ // / A B C D E F
+ { 200, 100, 50, 50, 100, 10, 90 }, // abs
+ { 200, 200, 200, 200, 200, 200, 200 }, // maxCap
+ { 200, 110, 60, 50, 90, 90, 0 }, // used
+ { 10, 0, 0, 0, 10, 0, 10 }, // pending
+ { 40, 25, 15, 10, 15, 15, 0 }, // reserved
+ { 4, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 2, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+ // verify capacity taken from A1, not B1 despite B1 being far over
+ // its absolute guaranteed capacity
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appA)));
+ }
+
+ @Test
public void testZeroGuar() {
int[][] qData = new int[][] {
// / A B C D E F
@@ -938,12 +964,42 @@ public class TestProportionalCapacityPreemptionPolicy {
"queueA").get("");
assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
long extraForQueueA =
- tempQueueAPartition.current.getMemorySize() - tempQueueAPartition.guaranteed
+ tempQueueAPartition.getUsed().getMemorySize() - tempQueueAPartition.getGuaranteed()
.getMemorySize();
assertEquals(extraForQueueA,
tempQueueAPartition.preemptableExtra.getMemorySize());
}
+ @Test
+ public void testHierarchicalLarge3LevelsWithReserved() {
+ int[][] qData = new int[][] {
+ // / A F I
+ // B C G H J K
+ // D E
+ { 400, 200, 60, 140, 100, 40, 100, 70, 30, 100, 10, 90 }, // abs
+ { 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400, 400 }, // maxCap
+ { 400, 210, 60, 150, 100, 50, 100, 50, 50, 90, 10, 80 }, // used
+ { 10, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 10 }, // pending
+ { 50, 30, 20, 10, 5, 5, 0, 0, 0, 10, 10, 0 }, // reserved
+ // appA appB appC appD appE appF appG
+ { 7, 3, 1, 2, 1, 1, 2, 1, 1, 2, 1, 1 }, // apps
+ { -1, -1, 1, -1, 1, 1, -1, 1, 1, -1, 1, 1 }, // req granularity
+ { 3, 2, 0, 2, 0, 0, 2, 0, 0, 2, 0, 0 }, // subqueues
+ };
+ ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
+ policy.editSchedule();
+
+ verify(mDisp, times(10)).handle(argThat(new IsPreemptionRequestFor(appC)));
+ assertEquals(10, policy.getQueuePartitions().get("queueE").get("").preemptableExtra.getMemorySize());
+ //2nd level child(E) preempts 10, but parent A has only 9 extra
+ //check the parent can prempt only the extra from > 2 level child
+ TempQueuePerPartition tempQueueAPartition = policy.getQueuePartitions().get("queueA").get("");
+ assertEquals(0, tempQueueAPartition.untouchableExtra.getMemorySize());
+ long extraForQueueA = tempQueueAPartition.getUsed().getMemorySize()
+ - tempQueueAPartition.getGuaranteed().getMemorySize();
+ assertEquals(extraForQueueA,tempQueueAPartition.preemptableExtra.getMemorySize());
+ }
+
static class IsPreemptionRequestFor
extends ArgumentMatcher<ContainerPreemptEvent> {
private final ApplicationAttemptId appAttId;
@@ -1064,6 +1120,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ParentQueue root = mockParentQueue(null, queues[0], pqs);
ResourceUsage resUsage = new ResourceUsage();
resUsage.setUsed(used[0]);
+ resUsage.setReserved(reserved[0]);
when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
when(root.getAbsoluteUsedCapacity()).thenReturn(
Resources.divide(rc, tot, used[0], tot));
@@ -1089,6 +1146,7 @@ public class TestProportionalCapacityPreemptionPolicy {
q = mockParentQueue(p, queues[i], pqs);
ResourceUsage resUsagePerQueue = new ResourceUsage();
resUsagePerQueue.setUsed(used[i]);
+ resUsagePerQueue.setReserved(reserved[i]);
when(q.getQueueResourceUsage()).thenReturn(resUsagePerQueue);
} else {
q = mockLeafQueue(p, tot, i, abs, used, pending, reserved, apps, gran);
@@ -1165,6 +1223,7 @@ public class TestProportionalCapacityPreemptionPolicy {
ResourceUsage ru = new ResourceUsage();
ru.setPending(pending[i]);
ru.setUsed(used[i]);
+ ru.setReserved(reserved[i]);
when(lq.getQueueResourceUsage()).thenReturn(ru);
// consider moving where CapacityScheduler::comparator accessible
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
index 837c95f..e31a889 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -18,231 +18,23 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.NodeId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.event.Dispatcher;
-import org.apache.hadoop.yarn.event.EventHandler;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.TreeSet;
-import static org.mockito.Matchers.any;
import static org.mockito.Matchers.argThat;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Matchers.isA;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
- private static final Log LOG =
- LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
- static final String ROOT = CapacitySchedulerConfiguration.ROOT;
- private Map<String, CSQueue> nameToCSQueues = null;
- private Map<String, Resource> partitionToResource = null;
- private Map<NodeId, SchedulerNode> nodeIdToSchedulerNodes = null;
- private RMNodeLabelsManager nlm = null;
- private RMContext rmContext = null;
-
- private ResourceCalculator rc = new DefaultResourceCalculator();
- private Clock mClock = null;
- private CapacitySchedulerConfiguration conf = null;
- private CapacityScheduler cs = null;
- private EventHandler<SchedulerEvent> mDisp = null;
- private ProportionalCapacityPreemptionPolicy policy = null;
- private Resource clusterResource = null;
-
- @SuppressWarnings("unchecked")
+public class TestProportionalCapacityPreemptionPolicyForNodePartitions
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
@Before
public void setup() {
- org.apache.log4j.Logger.getRootLogger().setLevel(
- org.apache.log4j.Level.DEBUG);
-
- conf = new CapacitySchedulerConfiguration(new Configuration(false));
- conf.setLong(
- CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL, 10000);
- conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
- 3000);
- // report "ideal" preempt
- conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
- (float) 1.0);
- conf.setFloat(
- CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
- (float) 1.0);
-
- mClock = mock(Clock.class);
- cs = mock(CapacityScheduler.class);
- when(cs.getResourceCalculator()).thenReturn(rc);
- when(cs.getPreemptionManager()).thenReturn(new PreemptionManager());
- when(cs.getConfiguration()).thenReturn(conf);
-
- nlm = mock(RMNodeLabelsManager.class);
- mDisp = mock(EventHandler.class);
-
- rmContext = mock(RMContext.class);
- when(rmContext.getNodeLabelManager()).thenReturn(nlm);
- Dispatcher disp = mock(Dispatcher.class);
- when(rmContext.getDispatcher()).thenReturn(disp);
- when(disp.getEventHandler()).thenReturn(mDisp);
- when(cs.getRMContext()).thenReturn(rmContext);
-
+ super.setup();
policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
- partitionToResource = new HashMap<>();
- nodeIdToSchedulerNodes = new HashMap<>();
- nameToCSQueues = new HashMap<>();
- }
-
- @Test
- public void testBuilder() throws Exception {
- /**
- * Test of test, make sure we build expected mock schedulable objects
- */
- String labelsConfig =
- "=200,true;" + // default partition
- "red=100,false;" + // partition=red
- "blue=200,true"; // partition=blue
- String nodesConfig =
- "n1=red;" + // n1 has partition=red
- "n2=blue;" + // n2 has partition=blue
- "n3="; // n3 doesn't have partition
- String queuesConfig =
- // guaranteed,max,used,pending
- "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
- "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
- "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
- "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
- "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
- String appsConfig=
- //queueName\t(priority,resource,host,expression,#repeat,reserved)
- // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
- "a1\t" // app1 in a1
- + "(1,1,n3,red,50,false);" + // 50 * default in n3
-
- "a1\t" // app2 in a1
- + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
- // 50 * ignore-exclusivity (allocated)
- + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
- // 50 in n2 (allocated)
- "a2\t" // app3 in a2
- + "(1,1,n3,red,50,false);" + // 50 * default in n3
-
- "b\t" // app4 in b
- + "(1,1,n1,red,100,false);";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
-
- // Check queues:
- // root
- checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
- checkPendingResource(cs.getQueue("root"), "", 100);
- checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
- checkPendingResource(cs.getQueue("root"), "red", 100);
- checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
- checkPendingResource(cs.getQueue("root"), "blue", 200);
-
- // a
- checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
- checkPendingResource(cs.getQueue("a"), "", 100);
- checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
- checkPendingResource(cs.getQueue("a"), "red", 0);
- checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
- checkPendingResource(cs.getQueue("a"), "blue", 200);
-
- // a1
- checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
- checkPendingResource(cs.getQueue("a1"), "", 100);
- checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
- checkPendingResource(cs.getQueue("a1"), "red", 0);
- checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
- checkPendingResource(cs.getQueue("a1"), "blue", 0);
-
- // a2
- checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
- checkPendingResource(cs.getQueue("a2"), "", 0);
- checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
- checkPendingResource(cs.getQueue("a2"), "red", 0);
- checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
- checkPendingResource(cs.getQueue("a2"), "blue", 200);
-
- // b1
- checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
- checkPendingResource(cs.getQueue("b"), "", 0);
- checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
- checkPendingResource(cs.getQueue("b"), "red", 100);
- checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
- checkPendingResource(cs.getQueue("b"), "blue", 0);
-
- // Check ignored partitioned containers in queue
- Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
- .getIgnoreExclusivityRMContainers().get("blue").size());
-
- // Check applications
- Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
- Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
- Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
-
- // Check #containers
- FiCaSchedulerApp app1 = getApp("a1", 1);
- FiCaSchedulerApp app2 = getApp("a1", 2);
- FiCaSchedulerApp app3 = getApp("a2", 3);
- FiCaSchedulerApp app4 = getApp("b", 4);
-
- Assert.assertEquals(50, app1.getLiveContainers().size());
- checkContainerNodesInApp(app1, 50, "n3");
-
- Assert.assertEquals(50, app2.getLiveContainers().size());
- Assert.assertEquals(150, app2.getReservedContainers().size());
- checkContainerNodesInApp(app2, 200, "n2");
-
- Assert.assertEquals(50, app3.getLiveContainers().size());
- checkContainerNodesInApp(app3, 50, "n3");
-
- Assert.assertEquals(100, app4.getLiveContainers().size());
- checkContainerNodesInApp(app4, 100, "n1");
}
@Test
@@ -822,477 +614,4 @@ public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
verify(mDisp, never()).handle(
argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
}
-
- private ApplicationAttemptId getAppAttemptId(int id) {
- ApplicationId appId = ApplicationId.newInstance(0L, id);
- ApplicationAttemptId appAttemptId =
- ApplicationAttemptId.newInstance(appId, 1);
- return appAttemptId;
- }
-
- private void checkContainerNodesInApp(FiCaSchedulerApp app,
- int expectedContainersNumber, String host) {
- NodeId nodeId = NodeId.newInstance(host, 1);
- int num = 0;
- for (RMContainer c : app.getLiveContainers()) {
- if (c.getAllocatedNode().equals(nodeId)) {
- num++;
- }
- }
- for (RMContainer c : app.getReservedContainers()) {
- if (c.getAllocatedNode().equals(nodeId)) {
- num++;
- }
- }
- Assert.assertEquals(expectedContainersNumber, num);
- }
-
- private FiCaSchedulerApp getApp(String queueName, int appId) {
- for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
- .getApplications()) {
- if (app.getApplicationId().getId() == appId) {
- return app;
- }
- }
- return null;
- }
-
- private void checkAbsCapacities(CSQueue queue, String partition,
- float guaranteed, float max, float used) {
- QueueCapacities qc = queue.getQueueCapacities();
- Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
- Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
- Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
- }
-
- private void checkPendingResource(CSQueue queue, String partition, int pending) {
- ResourceUsage ru = queue.getQueueResourceUsage();
- Assert.assertEquals(pending, ru.getPending(partition).getMemorySize());
- }
-
- private void buildEnv(String labelsConfig, String nodesConfig,
- String queuesConfig, String appsConfig) throws IOException {
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, false);
- }
-
- private void buildEnv(String labelsConfig, String nodesConfig,
- String queuesConfig, String appsConfig,
- boolean useDominantResourceCalculator) throws IOException {
- if (useDominantResourceCalculator) {
- when(cs.getResourceCalculator()).thenReturn(
- new DominantResourceCalculator());
- }
- mockNodeLabelsManager(labelsConfig);
- mockSchedulerNodes(nodesConfig);
- for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
- when(cs.getSchedulerNode(nodeId)).thenReturn(
- nodeIdToSchedulerNodes.get(nodeId));
- }
- ParentQueue root = mockQueueHierarchy(queuesConfig);
- when(cs.getRootQueue()).thenReturn(root);
- when(cs.getClusterResource()).thenReturn(clusterResource);
- mockApplications(appsConfig);
-
- policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs,
- mClock);
- }
-
- private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
- String queueName, List<RMContainer> reservedContainers,
- List<RMContainer> liveContainers) {
- int containerId = 1;
- int start = containersConfig.indexOf("=") + 1;
- int end = -1;
-
- while (start < containersConfig.length()) {
- while (start < containersConfig.length()
- && containersConfig.charAt(start) != '(') {
- start++;
- }
- if (start >= containersConfig.length()) {
- throw new IllegalArgumentException(
- "Error containers specification, line=" + containersConfig);
- }
- end = start + 1;
- while (end < containersConfig.length()
- && containersConfig.charAt(end) != ')') {
- end++;
- }
- if (end >= containersConfig.length()) {
- throw new IllegalArgumentException(
- "Error containers specification, line=" + containersConfig);
- }
-
- // now we found start/end, get container values
- String[] values = containersConfig.substring(start + 1, end).split(",");
- if (values.length != 6) {
- throw new IllegalArgumentException("Format to define container is:"
- + "(priority,resource,host,expression,repeat,reserved)");
- }
- Priority pri = Priority.newInstance(Integer.parseInt(values[0]));
- Resource res = parseResourceFromString(values[1]);
- NodeId host = NodeId.newInstance(values[2], 1);
- String exp = values[3];
- int repeat = Integer.parseInt(values[4]);
- boolean reserved = Boolean.parseBoolean(values[5]);
-
- for (int i = 0; i < repeat; i++) {
- Container c = mock(Container.class);
- when(c.getResource()).thenReturn(res);
- when(c.getPriority()).thenReturn(pri);
- RMContainerImpl rmc = mock(RMContainerImpl.class);
- when(rmc.getAllocatedNode()).thenReturn(host);
- when(rmc.getNodeLabelExpression()).thenReturn(exp);
- when(rmc.getAllocatedResource()).thenReturn(res);
- when(rmc.getContainer()).thenReturn(c);
- when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
- final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
- when(rmc.getContainerId()).thenReturn(
- cId);
- doAnswer(new Answer<Integer>() {
- @Override
- public Integer answer(InvocationOnMock invocation) throws Throwable {
- return cId.compareTo(((RMContainer) invocation.getArguments()[0])
- .getContainerId());
- }
- }).when(rmc).compareTo(any(RMContainer.class));
-
- if (containerId == 1) {
- when(rmc.isAMContainer()).thenReturn(true);
- }
-
- if (reserved) {
- reservedContainers.add(rmc);
- } else {
- liveContainers.add(rmc);
- }
-
- // If this is a non-exclusive allocation
- String partition = null;
- if (exp.isEmpty()
- && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
- .isEmpty()) {
- LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
- Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
- queue.getIgnoreExclusivityRMContainers();
- if (!ignoreExclusivityContainers.containsKey(partition)) {
- ignoreExclusivityContainers.put(partition,
- new TreeSet<RMContainer>());
- }
- ignoreExclusivityContainers.get(partition).add(rmc);
- }
- LOG.debug("add container to app=" + attemptId + " res=" + res
- + " node=" + host + " nodeLabelExpression=" + exp + " partition="
- + partition);
-
- containerId++;
- }
-
- start = end + 1;
- }
- }
-
- /**
- * Format is:
- * <pre>
- * queueName\t // app1
- * (priority,resource,host,expression,#repeat,reserved)
- * (priority,resource,host,expression,#repeat,reserved);
- * queueName\t // app2
- * </pre>
- */
- private void mockApplications(String appsConfig) {
- int id = 1;
- for (String a : appsConfig.split(";")) {
- String[] strs = a.split("\t");
- String queueName = strs[0];
-
- // get containers
- List<RMContainer> liveContainers = new ArrayList<RMContainer>();
- List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
- ApplicationId appId = ApplicationId.newInstance(0L, id);
- ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
-
- mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
- liveContainers);
-
- FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
- when(app.getLiveContainers()).thenReturn(liveContainers);
- when(app.getReservedContainers()).thenReturn(reservedContainers);
- when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
- when(app.getApplicationId()).thenReturn(appId);
- when(app.getPriority()).thenReturn(Priority.newInstance(0));
-
- // add to LeafQueue
- LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
- queue.getApplications().add(app);
-
- id++;
- }
- }
-
- /**
- * Format is:
- * host1=partition;
- * host2=partition;
- */
- private void mockSchedulerNodes(String schedulerNodesConfigStr)
- throws IOException {
- String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
- for (String p : nodesConfigStrArray) {
- NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1);
- String partition = p.substring(p.indexOf("=") + 1, p.length());
-
- SchedulerNode sn = mock(SchedulerNode.class);
- when(sn.getNodeID()).thenReturn(nodeId);
- when(sn.getPartition()).thenReturn(partition);
- nodeIdToSchedulerNodes.put(nodeId, sn);
-
- LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
- }
- }
-
- /**
- * Format is:
- * <pre>
- * partition0=total_resource,exclusivity;
- * partition1=total_resource,exclusivity;
- * ...
- * </pre>
- */
- private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
- String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
- clusterResource = Resources.createResource(0);
- for (String p : partitionConfigArr) {
- String partitionName = p.substring(0, p.indexOf("="));
- Resource res = parseResourceFromString(p.substring(p.indexOf("=") + 1,
- p.indexOf(",")));
- boolean exclusivity =
- Boolean.parseBoolean(p.substring(p.indexOf(",") + 1, p.length()));
- when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
- .thenReturn(res);
- when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
-
- // add to partition to resource
- partitionToResource.put(partitionName, res);
- LOG.debug("add partition=" + partitionName + " totalRes=" + res
- + " exclusivity=" + exclusivity);
- Resources.addTo(clusterResource, res);
- }
-
- when(nlm.getClusterNodeLabelNames()).thenReturn(
- partitionToResource.keySet());
- }
-
- private Resource parseResourceFromString(String p) {
- String[] resource = p.split(":");
- Resource res = Resources.createResource(0);
- if (resource.length == 1) {
- res = Resources.createResource(Integer.parseInt(resource[0]));
- } else {
- res = Resources.createResource(Integer.parseInt(resource[0]),
- Integer.parseInt(resource[1]));
- }
- return res;
- }
-
- /**
- * Format is:
- * <pre>
- * root (<partition-name-1>=[guaranteed max used pending],<partition-name-2>=..);
- * -A(...);
- * --A1(...);
- * --A2(...);
- * -B...
- * </pre>
- * ";" splits queues, and there should no empty lines, no extra spaces
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- private ParentQueue mockQueueHierarchy(String queueExprs) {
- String[] queueExprArray = queueExprs.split(";");
- ParentQueue rootQueue = null;
- for (int idx = 0; idx < queueExprArray.length; idx++) {
- String q = queueExprArray[idx];
- CSQueue queue;
-
- // Initialize queue
- if (isParent(queueExprArray, idx)) {
- ParentQueue parentQueue = mock(ParentQueue.class);
- queue = parentQueue;
- List<CSQueue> children = new ArrayList<CSQueue>();
- when(parentQueue.getChildQueues()).thenReturn(children);
- } else {
- LeafQueue leafQueue = mock(LeafQueue.class);
- final TreeSet<FiCaSchedulerApp> apps = new TreeSet<>(
- new Comparator<FiCaSchedulerApp>() {
- @Override
- public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
- return a1.getApplicationId().compareTo(a2.getApplicationId());
- }
- });
- when(leafQueue.getApplications()).thenReturn(apps);
- OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
- when(so.getPreemptionIterator()).thenAnswer(new Answer() {
- public Object answer(InvocationOnMock invocation) {
- return apps.descendingIterator();
- }
- });
- when(leafQueue.getOrderingPolicy()).thenReturn(so);
-
- Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
- new HashMap<>();
- when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
- ignorePartitionContainers);
- queue = leafQueue;
- }
-
- setupQueue(queue, q, queueExprArray, idx);
- if (queue.getQueueName().equals(ROOT)) {
- rootQueue = (ParentQueue) queue;
- }
- }
- return rootQueue;
- }
-
- private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
- int idx) {
- LOG.debug("*** Setup queue, source=" + q);
- String queuePath = null;
-
- int myLevel = getLevel(q);
- if (0 == myLevel) {
- // It's root
- when(queue.getQueueName()).thenReturn(ROOT);
- queuePath = ROOT;
- }
-
- String queueName = getQueueName(q);
- when(queue.getQueueName()).thenReturn(queueName);
-
- // Setup parent queue, and add myself to parentQueue.children-list
- ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
- if (null != parentQueue) {
- when(queue.getParent()).thenReturn(parentQueue);
- parentQueue.getChildQueues().add(queue);
-
- // Setup my path
- queuePath = parentQueue.getQueuePath() + "." + queueName;
- }
- when(queue.getQueuePath()).thenReturn(queuePath);
-
- QueueCapacities qc = new QueueCapacities(0 == myLevel);
- ResourceUsage ru = new ResourceUsage();
-
- when(queue.getQueueCapacities()).thenReturn(qc);
- when(queue.getQueueResourceUsage()).thenReturn(ru);
-
- LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
- + queue.getQueuePath());
- LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
- .getQueueName()));
-
- // Setup other fields like used resource, guaranteed resource, etc.
- String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
- for (String s : capacitySettingStr.split(",")) {
- String partitionName = s.substring(0, s.indexOf("="));
- String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
- // Add a small epsilon to capacities to avoid truncate when doing
- // Resources.multiply
- float epsilon = 1e-6f;
- Resource totResoucePerPartition = partitionToResource.get(partitionName);
- float absGuaranteed = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[0].trim()), totResoucePerPartition)
- + epsilon;
- float absMax = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[1].trim()), totResoucePerPartition)
- + epsilon;
- float absUsed = Resources.divide(rc, totResoucePerPartition,
- parseResourceFromString(values[2].trim()), totResoucePerPartition)
- + epsilon;
- Resource pending = parseResourceFromString(values[3].trim());
- qc.setAbsoluteCapacity(partitionName, absGuaranteed);
- qc.setAbsoluteMaximumCapacity(partitionName, absMax);
- qc.setAbsoluteUsedCapacity(partitionName, absUsed);
- ru.setPending(partitionName, pending);
- if (!isParent(queueExprArray, idx)) {
- LeafQueue lq = (LeafQueue) queue;
- when(lq.getTotalPendingResourcesConsideringUserLimit(isA(Resource.class),
- isA(String.class))).thenReturn(pending);
- }
- ru.setUsed(partitionName, parseResourceFromString(values[2].trim()));
- LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
- + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
- + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
- }
-
- // Setup preemption disabled
- when(queue.getPreemptionDisabled()).thenReturn(
- conf.getPreemptionDisabled(queuePath, false));
-
- nameToCSQueues.put(queueName, queue);
- when(cs.getQueue(eq(queueName))).thenReturn(queue);
- }
-
- /**
- * Level of a queue is how many "-" at beginning, root's level is 0
- */
- private int getLevel(String q) {
- int level = 0; // level = how many "-" at beginning
- while (level < q.length() && q.charAt(level) == '-') {
- level++;
- }
- return level;
- }
-
- private String getQueueName(String q) {
- int idx = 0;
- // find first != '-' char
- while (idx < q.length() && q.charAt(idx) == '-') {
- idx++;
- }
- if (idx == q.length()) {
- throw new IllegalArgumentException("illegal input:" + q);
- }
- // name = after '-' and before '('
- String name = q.substring(idx, q.indexOf('('));
- if (name.isEmpty()) {
- throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
- }
- if (name.contains(".")) {
- throw new IllegalArgumentException("queue name shouldn't contain '.':"
- + name);
- }
- return name;
- }
-
- private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
- idx--;
- while (idx >= 0) {
- int level = getLevel(queueExprArray[idx]);
- if (level < myLevel) {
- String parentQueuName = getQueueName(queueExprArray[idx]);
- return (ParentQueue) nameToCSQueues.get(parentQueuName);
- }
- idx--;
- }
-
- return null;
- }
-
- /**
- * Get if a queue is ParentQueue
- */
- private boolean isParent(String[] queues, int idx) {
- int myLevel = getLevel(queues[idx]);
- idx++;
- while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
- idx++;
- }
- if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
- // It's a LeafQueue
- return false;
- } else {
- return true;
- }
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
new file mode 100644
index 0000000..38b2e78
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForReservedContainers.java
@@ -0,0 +1,430 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestProportionalCapacityPreemptionPolicyForReservedContainers
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
+ @Before
+ public void setup() {
+ super.setup();
+ conf.setBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+ true);
+ policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+ }
+
+ @Test
+ public void testPreemptionForSimpleReservedContainer() throws IOException {
+ /**
+ * The simplest test of reserved container, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ * Guaranteed resource of a/b are 50:50
+ * Total cluster resource = 100
+ * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+ * container is 1.
+ * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+ * so B needs to preempt 9 containers from A at n1 instead of randomly
+ * preempt from n1 and n2.
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 9 9]);" + //root
+ "-a(=[50 100 90 0]);" + // a
+ "-b(=[50 100 10 9 9])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,,45,false)" // 45 in n1
+ + "(1,1,n2,,45,false);" + // 45 in n2
+ "b\t" // app2 in b
+ + "(1,1,n1,,1,false)" // AM container in n1
+ + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Total 5 preempted from app1 at n1, don't preempt container from other
+ // app/node
+ verify(mDisp, times(5)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(5)).handle(
+ argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testUseReservedAndFifoSelectorTogether() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ * Guaranteed resource of a/b are 30:70
+ * Total cluster resource = 100
+ * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+ * container is 1.
+ * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+ * B also has 20 pending resources.
+ * so B needs to preempt:
+ * - 10 containers from n1 (for reserved)
+ * - 5 containers from n2 for pending resources
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 70 10]);" + //root
+ "-a(=[30 100 45 0]);" + // a
+ "-b(=[70 100 55 70 50])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n2,,35,false)" // 35 in n2
+ + "(1,1,n1,,10,false);" + // 10 in n1
+ "b\t" // app2 in b
+ + "(1,1,n2,,5,false)" // 5 in n2
+ + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(10)).handle(
+ argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(5)).handle(
+ argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n2", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testReservedSelectorSkipsAMContainer() throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ * Guaranteed resource of a/b are 30:70
+ * Total cluster resource = 100
+ * - A has 45 containers on two node, n1 has 10, n2 has 35, size of each
+ * container is 1.
+ * - B has 5 containers at n2, and reserves 1 container with size = 50 at n1,
+ * B also has 20 pending resources.
+ *
+ * Ideally B needs to preempt:
+ * - 10 containers from n1 (for reserved)
+ * - 5 containers from n2 for pending resources
+ *
+ * However, since one AM container is located at n1 (from queueA), we cannot
+ * preempt 10 containers from n1 for reserved container. Instead, we will
+ * preempt 15 containers from n2, since containers from queueA launched in n2
+ * are later than containers from queueA launched in n1 (FIFO order of containers)
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 70 10]);" + //root
+ "-a(=[30 100 45 0]);" + // a
+ "-b(=[70 100 55 70 50])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,,10,false)" // 10 in n1
+ + "(1,1,n2,,35,false);" +// 35 in n2
+ "b\t" // app2 in b
+ + "(1,1,n2,,5,false)" // 5 in n2
+ + "(1,50,n1,,1,true)"; // 1 container with size=50 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(15)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(0)).handle(
+ argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(15)).handle(
+ argThat(new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n2", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testPreemptionForReservedContainerRespectGuaranteedResource()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ * Guaranteed resource of a/b are 85:15
+ * Total cluster resource = 100
+ * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+ * container is 1.
+ * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+ *
+ * If we preempt 9 containers from queue-A, queue-A will be below its
+ * guaranteed resource = 90 - 9 = 81 < 85.
+ *
+ * So no preemption will take place
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 100 9 9]);" + //root
+ "-a(=[85 100 90 0]);" + // a
+ "-b(=[15 100 10 9 9])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,,45,false)" // 45 in n1
+ + "(1,1,n2,,45,false);" + // 45 in n2
+ "b\t" // app2 in b
+ + "(1,1,n1,,1,false)" // AM container in n1
+ + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+ getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testPreemptionForReservedContainerWhichHasAvailableResource()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Guaranteed resource of a/b are 50:50
+ * Total cluster resource = 100
+ * - A has 90 containers on two node, n1 has 45, n2 has 45, size of each
+ * container is 1.
+ * - B has am container at n1, and reserves 1 container with size = 9 at n1,
+ *
+ * So we can get 4 containers preempted after preemption.
+ * (reserved 5 + preempted 4) = 9
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 99 9 9]);" + //root
+ "-a(=[50 100 90 0]);" + // a
+ "-b(=[50 100 9 9 9])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,,45,false)" // 45 in n1
+ + "(1,1,n2,,45,false);" + // 45 in n2
+ "b\t" // app2 in b
+ + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Total 4 preempted from app1 at n1, don't preempt container from other
+ // app/node
+ verify(mDisp, times(4)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n2", 1))));
+ }
+
+ @Test
+ public void testPreemptionForReservedContainerWhichHasNondivisibleAvailableResource()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Guaranteed resource of a/b are 50:50
+ * Total cluster resource = 100
+ * - A has 45 containers on two node, size of each container is 2,
+ * n1 has 23, n2 has 22
+ * - B reserves 1 container with size = 9 at n1,
+ *
+ * So we can get 4 containers (total-resource = 8) preempted after
+ * preemption. Actual required is 3.5, but we need to preempt integer
+ * number of containers
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig = // n1 / n2 has no label
+ "n1= res=50;" +
+ "n2= res=50";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 99 9 9]);" + //root
+ "-a(=[50 100 90 0]);" + // a
+ "-b(=[50 100 9 9 9])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,2,n1,,24,false)" // 48 in n1
+ + "(1,2,n2,,23,false);" + // 46 in n2
+ "b\t" // app2 in b
+ + "(1,9,n1,,1,true)"; // 1 container with size=9 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // Total 4 preempted from app1 at n1, don't preempt container from other
+ // app/node
+ verify(mDisp, times(4)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n2", 1))));
+ }
+
+ @Test
+ public void testPreemptionForReservedContainerRespectAvailableResources()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Guaranteed resource of a/b are 50:50
+ * Total cluster resource = 100, 4 nodes, 25 on each node
+ * - A has 10 containers on every node, size of container is 2
+ * - B reserves 1 container with size = 9 at n1,
+ *
+ * So even if we cannot allocate container for B now, no preemption should
+ * happen since there're plenty of available resources.
+ */
+ String labelsConfig =
+ "=100,true;";
+ String nodesConfig =
+ "n1= res=25;" +
+ "n2= res=25;" +
+ "n3= res=25;" +
+ "n4= res=25;";
+ String queuesConfig =
+ // guaranteed,max,used,pending,reserved
+ "root(=[100 100 89 9 9]);" + //root
+ "-a(=[50 100 80 0]);" + // a
+ "-b(=[50 100 9 9 9])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,2,n1,,10,false)" // 10 in n1
+ + "(1,2,n2,,10,false)" // 10 in n2
+ + "(1,2,n3,,10,false)" // 10 in n3
+ + "(1,2,n4,,10,false);" + // 10 in n4
+ "b\t" // app2 in b
+ + "(1,9,n1,,1,true)"; // 1 container with size=5 reserved at n1
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // No preemption should happen
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n1", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n2", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n3", 1))));
+ verify(mDisp, times(0)).handle(argThat(
+ new IsPreemptionRequestForQueueAndNode(getAppAttemptId(1), "a",
+ NodeId.newInstance("n4", 1))));
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[4/4] hadoop git commit: YARN-4390. Do surgical preemption based on
reserved container in CapacityScheduler. Contributed by Wangda Tan
Posted by ep...@apache.org.
YARN-4390. Do surgical preemption based on reserved container in CapacityScheduler. Contributed by Wangda Tan
(cherry picked from commit bb62e0592566b2fcae7136b30972aad2d3ac55b0)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/40367c8d
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/40367c8d
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/40367c8d
Branch: refs/heads/branch-2.8
Commit: 40367c8da3a4798b004714c3fbd69d84c1dc84f5
Parents: bced9e2
Author: Jian He <ji...@apache.org>
Authored: Thu May 5 12:56:21 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Fri Dec 9 20:33:54 2016 +0000
----------------------------------------------------------------------
.../monitor/SchedulingMonitor.java | 16 +-
.../CapacitySchedulerPreemptionUtils.java | 35 +-
.../capacity/FifoCandidatesSelector.java | 26 +-
.../capacity/PreemptableResourceCalculator.java | 139 ++--
.../ProportionalCapacityPreemptionPolicy.java | 96 ++-
.../ReservedContainerCandidatesSelector.java | 316 +++++++++
.../monitor/capacity/TempQueuePerPartition.java | 184 ++++-
.../rmcontainer/RMContainer.java | 2 +
.../rmcontainer/RMContainerImpl.java | 10 +
.../scheduler/AbstractYarnScheduler.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 4 +
.../scheduler/SchedulerNode.java | 21 +-
.../scheduler/capacity/CapacityScheduler.java | 9 +-
.../CapacitySchedulerConfiguration.java | 24 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 1 +
.../common/fica/FiCaSchedulerNode.java | 7 +-
.../scheduler/fair/FSAppAttempt.java | 1 +
.../scheduler/fair/FairScheduler.java | 3 +-
.../scheduler/fifo/FifoScheduler.java | 2 +-
...alCapacityPreemptionPolicyMockFramework.java | 687 +++++++++++++++++++
...estProportionalCapacityPreemptionPolicy.java | 61 +-
...pacityPreemptionPolicyForNodePartitions.java | 687 +------------------
...tyPreemptionPolicyForReservedContainers.java | 430 ++++++++++++
...alCapacityPreemptionPolicyMockFramework.java | 247 +++++++
.../TestSchedulerApplicationAttempt.java | 3 +-
.../CapacitySchedulerPreemptionTestBase.java | 149 ++++
.../capacity/TestCapacityScheduler.java | 2 +-
.../TestCapacitySchedulerLazyPreemption.java | 639 +++++++++++++++++
.../TestCapacitySchedulerPreemption.java | 683 ------------------
...TestCapacitySchedulerSurgicalPreemption.java | 246 +++++++
.../scheduler/fair/TestFairScheduler.java | 2 +-
.../scheduler/fifo/TestFifoScheduler.java | 2 +-
32 files changed, 3193 insertions(+), 1543 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 55ec858..03e180d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -21,6 +21,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
@@ -84,10 +85,17 @@ public class SchedulingMonitor extends AbstractService {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
- //invoke the preemption policy at a regular pace
- //the policy will generate preemption or kill events
- //managed by the dispatcher
- invokePolicy();
+ try {
+ //invoke the preemption policy at a regular pace
+ //the policy will generate preemption or kill events
+ //managed by the dispatcher
+ invokePolicy();
+ } catch (YarnRuntimeException e) {
+ LOG.error("YarnRuntimeException raised while executing preemption"
+ + " checker, skip this run..., exception=", e);
+ }
+
+ // Wait before next run
try {
Thread.sleep(monitorInterval);
} catch (InterruptedException e) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index a71f108..42d8730 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -21,9 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
@@ -40,9 +42,9 @@ public class CapacitySchedulerPreemptionUtils {
// Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
if (Resources.greaterThan(context.getResourceCalculator(),
- clusterResource, qT.actuallyToBePreempted, Resources.none())) {
+ clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
resToObtainByPartition.put(qT.partition,
- Resources.clone(qT.actuallyToBePreempted));
+ Resources.clone(qT.getActuallyToBePreempted()));
}
}
@@ -62,4 +64,33 @@ public class CapacitySchedulerPreemptionUtils {
}
return containers.contains(container);
}
+
+ public static void deductPreemptableResourcesBasedSelectedCandidates(
+ CapacitySchedulerPreemptionContext context,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+ for (Set<RMContainer> containers : selectedCandidates.values()) {
+ for (RMContainer c : containers) {
+ SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
+ c.getAllocatedNode());
+ if (null == schedulerNode) {
+ continue;
+ }
+
+ String partition = schedulerNode.getPartition();
+ String queue = c.getQueueName();
+ TempQueuePerPartition tq = context.getQueueByPartition(queue,
+ partition);
+
+ Resource res = c.getReservedResource();
+ if (null == res) {
+ res = c.getAllocatedResource();
+ }
+
+ if (null != res) {
+ tq.deductActuallyToBePreempted(context.getResourceCalculator(),
+ tq.totalPartitionResource, res);
+ }
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 499d0ff..a8c62fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -36,7 +36,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Comparator;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -55,7 +54,7 @@ public class FifoCandidatesSelector
super(preemptionContext);
preemptableAmountCalculator = new PreemptableResourceCalculator(
- preemptionContext);
+ preemptionContext, false);
}
@Override
@@ -66,8 +65,13 @@ public class FifoCandidatesSelector
preemptableAmountCalculator.computeIdealAllocation(clusterResource,
totalPreemptionAllowed);
- Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
- new HashMap<>();
+ // Previous selectors (with higher priority) could have already
+ // selected containers. We need to deduct preemptable resources
+ // based on already selected candidates.
+ CapacitySchedulerPreemptionUtils
+ .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
+ selectedCandidates);
+
List<RMContainer> skippedAMContainerlist = new ArrayList<>();
// Loop all leaf queues
@@ -109,7 +113,7 @@ public class FifoCandidatesSelector
continue;
}
boolean preempted = tryPreemptContainerAndDeductResToObtain(
- resToObtainByPartition, c, clusterResource, preemptMap,
+ resToObtainByPartition, c, clusterResource, selectedCandidates,
totalPreemptionAllowed);
if (!preempted) {
continue;
@@ -132,7 +136,7 @@ public class FifoCandidatesSelector
}
preemptFrom(fc, clusterResource, resToObtainByPartition,
- skippedAMContainerlist, skippedAMSize, preemptMap,
+ skippedAMContainerlist, skippedAMSize, selectedCandidates,
totalPreemptionAllowed);
}
@@ -144,13 +148,13 @@ public class FifoCandidatesSelector
leafQueue.getAbsoluteCapacity()),
leafQueue.getMaxAMResourcePerQueuePercent());
- preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+ preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
totalPreemptionAllowed);
}
}
- return preemptMap;
+ return selectedCandidates;
}
/**
@@ -236,9 +240,9 @@ public class FifoCandidatesSelector
resourceToObtainByPartitions.remove(nodePartition);
}
if (LOG.isDebugEnabled()) {
- LOG.debug("Marked container=" + rmContainer.getContainerId()
- + " in partition=" + nodePartition
- + " to be preemption candidates");
+ LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
+ .getContainerId() + " from partition=" + nodePartition + " queue="
+ + rmContainer.getQueueName() + " to be preemption candidates");
}
// Add to preemptMap
addToPreemptMap(preemptMap, attemptId, rmContainer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 2217210..103b419 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -37,7 +37,7 @@ import java.util.Set;
/**
* Calculate how much resources need to be preempted for each queue,
- * will be used by {@link FifoCandidatesSelector}
+ * will be used by {@link PreemptionCandidatesSelector}
*/
public class PreemptableResourceCalculator {
private static final Log LOG =
@@ -45,6 +45,7 @@ public class PreemptableResourceCalculator {
private final CapacitySchedulerPreemptionContext context;
private final ResourceCalculator rc;
+ private boolean isReservedPreemptionCandidatesSelector;
static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
@@ -71,18 +72,31 @@ public class PreemptableResourceCalculator {
// capacity and therefore considered last for resources.
private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
double pctOver = Integer.MAX_VALUE;
- if (q != null && Resources.greaterThan(
- rc, clusterRes, q.guaranteed, Resources.none())) {
- pctOver =
- Resources.divide(rc, clusterRes, q.idealAssigned, q.guaranteed);
+ if (q != null && Resources.greaterThan(rc, clusterRes,
+ q.getGuaranteed(),
+ Resources.none())) {
+ pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
+ q.getGuaranteed());
}
return (pctOver);
}
}
- public PreemptableResourceCalculator(CapacitySchedulerPreemptionContext preemptionContext) {
+ /**
+ * PreemptableResourceCalculator constructor
+ *
+ * @param preemptionContext preemption policy context
+ * @param isReservedPreemptionCandidatesSelector this will be set by
+ * different implementation of candidate selectors, please refer to
+ * TempQueuePerPartition#offer for details.
+ */
+ public PreemptableResourceCalculator(
+ CapacitySchedulerPreemptionContext preemptionContext,
+ boolean isReservedPreemptionCandidatesSelector) {
context = preemptionContext;
rc = preemptionContext.getResourceCalculator();
+ this.isReservedPreemptionCandidatesSelector =
+ isReservedPreemptionCandidatesSelector;
}
/**
@@ -101,11 +115,11 @@ public class PreemptableResourceCalculator {
}
} else {
for (TempQueuePerPartition q : queues) {
- Resources.addTo(activeCap, q.guaranteed);
+ Resources.addTo(activeCap, q.getGuaranteed());
}
for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
- q.guaranteed, activeCap);
+ q.getGuaranteed(), activeCap);
}
}
}
@@ -114,7 +128,8 @@ public class PreemptableResourceCalculator {
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
- PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+ PriorityQueue<TempQueuePerPartition> orderedByNeed,
+ TQComparator tqComparator) {
ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
while (!orderedByNeed.isEmpty()) {
TempQueuePerPartition q1 = orderedByNeed.remove();
@@ -155,15 +170,19 @@ public class PreemptableResourceCalculator {
tqComparator);
for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
TempQueuePerPartition q = i.next();
- if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
- q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
+ Resource used = q.getUsed();
+
+ if (Resources.greaterThan(rc, tot_guarant, used,
+ q.getGuaranteed())) {
+ q.idealAssigned = Resources.add(
+ q.getGuaranteed(), q.untouchableExtra);
} else {
- q.idealAssigned = Resources.clone(q.current);
+ q.idealAssigned = Resources.clone(used);
}
Resources.subtractFrom(unassigned, q.idealAssigned);
- // If idealAssigned < (current + pending), q needs more resources, so
+ // If idealAssigned < (allocated + used + pending), q needs more resources, so
// add it to the list of underserved queues, ordered by need.
- Resource curPlusPend = Resources.add(q.current, q.pending);
+ Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
orderedByNeed.add(q);
}
@@ -190,7 +209,8 @@ public class PreemptableResourceCalculator {
TempQueuePerPartition sub = i.next();
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
- Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
+ Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
+ isReservedPreemptionCandidatesSelector);
Resource wQdone = Resources.subtract(wQavail, wQidle);
if (Resources.greaterThan(rc, tot_guarant,
@@ -234,8 +254,8 @@ public class PreemptableResourceCalculator {
Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<>();
for (TempQueuePerPartition q : qAlloc) {
- if (Resources
- .greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
+ if (Resources.greaterThan(rc, tot_guarant,
+ q.getGuaranteed(), Resources.none())) {
nonZeroGuarQueues.add(q);
} else {
zeroGuarQueues.add(q);
@@ -258,19 +278,22 @@ public class PreemptableResourceCalculator {
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
for (TempQueuePerPartition t:queues) {
- if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
- Resources.addTo(totPreemptionNeeded,
- Resources.subtract(t.current, t.idealAssigned));
+ if (Resources.greaterThan(rc, tot_guarant,
+ t.getUsed(), t.idealAssigned)) {
+ Resources.addTo(totPreemptionNeeded, Resources
+ .subtract(t.getUsed(), t.idealAssigned));
}
}
- // if we need to preempt more than is allowed, compute a factor (0<f<1)
- // that is used to scale down how much we ask back from each queue
+ /**
+ * if we need to preempt more than is allowed, compute a factor (0<f<1)
+ * that is used to scale down how much we ask back from each queue
+ */
float scalingFactor = 1.0F;
- if (Resources.greaterThan(rc, tot_guarant,
- totPreemptionNeeded, totalPreemptionAllowed)) {
- scalingFactor = Resources.divide(rc, tot_guarant,
- totalPreemptionAllowed, totPreemptionNeeded);
+ if (Resources.greaterThan(rc,
+ tot_guarant, totPreemptionNeeded, totalPreemptionAllowed)) {
+ scalingFactor = Resources.divide(rc, tot_guarant, totalPreemptionAllowed,
+ totPreemptionNeeded);
}
// assign to each queue the amount of actual preemption based on local
@@ -278,12 +301,6 @@ public class PreemptableResourceCalculator {
for (TempQueuePerPartition t : queues) {
t.assignPreemption(scalingFactor, rc, tot_guarant);
}
- if (LOG.isDebugEnabled()) {
- for (TempQueuePerPartition t : queues) {
- LOG.debug(t);
- }
- }
-
}
/**
@@ -329,12 +346,31 @@ public class PreemptableResourceCalculator {
for (TempQueuePerPartition qT : context.getQueuePartitions(queueName)) {
// we act only if we are violating balance by more than
// maxIgnoredOverCapacity
- if (Resources.greaterThan(rc, clusterResource, qT.current,
- Resources.multiply(qT.guaranteed, 1.0 + context.getMaxIgnoreOverCapacity()))) {
- // we introduce a dampening factor naturalTerminationFactor that
- // accounts for natural termination of containers
- Resource resToObtain = Resources.multiply(qT.toBePreempted,
- context.getNaturalTerminationFactor());
+ if (Resources.greaterThan(rc, clusterResource,
+ qT.getUsed(), Resources
+ .multiply(qT.getGuaranteed(),
+ 1.0 + context.getMaxIgnoreOverCapacity()))) {
+ /*
+ * We introduce a dampening factor naturalTerminationFactor that
+ * accounts for natural termination of containers.
+ *
+ * This is added to control pace of preemption, let's say:
+ * If preemption policy calculated a queue *should be* preempted 20 GB
+ * And the nature_termination_factor set to 0.1. As a result, preemption
+ * policy will select 20 GB * 0.1 = 2GB containers to be preempted.
+ *
+ * However, it doesn't work for YARN-4390:
+ * For example, if a queue needs to be preempted 20GB for *one single*
+ * large container, preempt 10% of such resource isn't useful.
+ * So to make it simple, only apply nature_termination_factor when
+ * selector is not reservedPreemptionCandidatesSelector.
+ */
+ Resource resToObtain = qT.toBePreempted;
+ if (!isReservedPreemptionCandidatesSelector) {
+ resToObtain = Resources.multiply(qT.toBePreempted,
+ context.getNaturalTerminationFactor());
+ }
+
// Only add resToObtain when it >= 0
if (Resources.greaterThan(rc, clusterResource, resToObtain,
Resources.none())) {
@@ -343,22 +379,39 @@ public class PreemptableResourceCalculator {
+ " resource-to-obtain=" + resToObtain);
}
}
- qT.actuallyToBePreempted = Resources.clone(resToObtain);
+ qT.setActuallyToBePreempted(Resources.clone(resToObtain));
} else {
- qT.actuallyToBePreempted = Resources.none();
+ qT.setActuallyToBePreempted(Resources.none());
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(qT);
}
}
}
}
+ private void updatePreemptableExtras(TempQueuePerPartition cur) {
+ if (cur.children == null || cur.children.isEmpty()) {
+ cur.updatePreemptableExtras(rc);
+ } else {
+ for (TempQueuePerPartition child : cur.children) {
+ updatePreemptableExtras(child);
+ }
+ cur.updatePreemptableExtras(rc);
+ }
+ }
+
public void computeIdealAllocation(Resource clusterResource,
Resource totalPreemptionAllowed) {
for (String partition : context.getAllPartitions()) {
- TempQueuePerPartition tRoot =
- context.getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+ TempQueuePerPartition tRoot = context.getQueueByPartition(
+ CapacitySchedulerConfiguration.ROOT, partition);
+ updatePreemptableExtras(tRoot);
+
// compute the ideal distribution of resources among queues
// updates cloned queues state accordingly
- tRoot.idealAssigned = tRoot.guaranteed;
+ tRoot.idealAssigned = tRoot.getGuaranteed();
recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 5de64b1..bf19f5a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -34,7 +34,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResour
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
@@ -45,6 +45,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
+import java.text.MessageFormat;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -173,6 +174,15 @@ public class ProportionalCapacityPreemptionPolicy
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
+ // Do we need to specially consider reserved containers?
+ boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
+ CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+ CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
+ if (selectCandidatesForResevedContainers) {
+ candidatesSelectionPolicies.add(
+ new ReservedContainerCandidatesSelector(this));
+ }
+
// initialize candidates preemption selection policies
candidatesSelectionPolicies.add(
new FifoCandidatesSelector(this));
@@ -185,9 +195,15 @@ public class ProportionalCapacityPreemptionPolicy
@Override
public void editSchedule() {
+ long startTs = clock.getTime();
+
CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
containerBasedPreemptOrKill(root, clusterResources);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
+ }
}
@SuppressWarnings("unchecked")
@@ -295,8 +311,8 @@ public class ProportionalCapacityPreemptionPolicy
queueToPartitions.clear();
for (String partitionToLookAt : allPartitions) {
- cloneQueues(root,
- nlm.getResourceByLabel(partitionToLookAt, clusterResources),
+ cloneQueues(root, Resources
+ .clone(nlm.getResourceByLabel(partitionToLookAt, clusterResources)),
partitionToLookAt);
}
}
@@ -311,9 +327,15 @@ public class ProportionalCapacityPreemptionPolicy
// based on ideal allocation select containers to be preemptionCandidates from each
// queue and each application
- Map<ApplicationAttemptId, Set<RMContainer>> toPreempt = null;
+ Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
+ new HashMap<>();
for (PreemptionCandidatesSelector selector :
candidatesSelectionPolicies) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(MessageFormat
+ .format("Trying to use {0} to select preemption candidates",
+ selector.getClass().getName()));
+ }
toPreempt = selector.selectCandidates(toPreempt,
clusterResources, totalPreemptionAllowed);
}
@@ -377,14 +399,15 @@ public class ProportionalCapacityPreemptionPolicy
float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
boolean preemptionDisabled = curQueue.getPreemptionDisabled();
- Resource current = curQueue.getQueueResourceUsage().getUsed(
- partitionToLookAt);
- Resource guaranteed = Resources.multiply(partitionResource, absCap);
- Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
+ Resource current = Resources.clone(
+ curQueue.getQueueResourceUsage().getUsed(partitionToLookAt));
Resource killable = Resources.none();
+
+ Resource reserved = Resources.clone(
+ curQueue.getQueueResourceUsage().getReserved(partitionToLookAt));
if (null != preemptableQueues.get(queueName)) {
- killable = preemptableQueues.get(queueName)
- .getKillableResource(partitionToLookAt);
+ killable = Resources.clone(preemptableQueues.get(queueName)
+ .getKillableResource(partitionToLookAt));
}
// when partition is a non-exclusive partition, the actual maxCapacity
@@ -392,53 +415,24 @@ public class ProportionalCapacityPreemptionPolicy
try {
if (!scheduler.getRMContext().getNodeLabelManager()
.isExclusiveNodeLabel(partitionToLookAt)) {
- maxCapacity =
- Resources.max(rc, partitionResource, maxCapacity, current);
+ absMaxCap = 1.0f;
}
} catch (IOException e) {
// This may cause by partition removed when running capacity monitor,
// just ignore the error, this will be corrected when doing next check.
}
- Resource extra = Resource.newInstance(0, 0);
- if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
- extra = Resources.subtract(current, guaranteed);
- }
- if (curQueue instanceof LeafQueue) {
- LeafQueue l = (LeafQueue) curQueue;
- Resource pending =
- l.getTotalPendingResourcesConsideringUserLimit(
- partitionResource, partitionToLookAt);
- ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
- maxCapacity, preemptionDisabled, partitionToLookAt, killable);
- if (preemptionDisabled) {
- ret.untouchableExtra = extra;
- } else {
- ret.preemptableExtra = extra;
- }
- ret.setLeafQueue(l);
- } else {
- Resource pending = Resource.newInstance(0, 0);
- ret =
- new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
- guaranteed, maxCapacity, false, partitionToLookAt, killable);
- Resource childrensPreemptable = Resource.newInstance(0, 0);
+ ret = new TempQueuePerPartition(queueName, current, preemptionDisabled,
+ partitionToLookAt, killable, absCap, absMaxCap, partitionResource,
+ reserved, curQueue);
+
+ if (curQueue instanceof ParentQueue) {
+ // Recursively add children
for (CSQueue c : curQueue.getChildQueues()) {
- TempQueuePerPartition subq =
- cloneQueues(c, partitionResource, partitionToLookAt);
- Resources.addTo(childrensPreemptable, subq.preemptableExtra);
+ TempQueuePerPartition subq = cloneQueues(c, partitionResource,
+ partitionToLookAt);
ret.addChild(subq);
}
- // untouchableExtra = max(extra - childrenPreemptable, 0)
- if (Resources.greaterThanOrEqual(
- rc, partitionResource, childrensPreemptable, extra)) {
- ret.untouchableExtra = Resource.newInstance(0, 0);
- } else {
- ret.untouchableExtra =
- Resources.subtract(extra, childrensPreemptable);
- }
- ret.preemptableExtra = Resources.min(
- rc, partitionResource, childrensPreemptable, extra);
}
}
addTempQueuePartition(ret);
@@ -481,7 +475,8 @@ public class ProportionalCapacityPreemptionPolicy
String partition) {
Map<String, TempQueuePerPartition> partitionToQueues;
if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
- return null;
+ throw new YarnRuntimeException("This shouldn't happen, cannot find "
+ + "TempQueuePerPartition for queueName=" + queueName);
}
return partitionToQueues.get(partition);
}
@@ -492,7 +487,8 @@ public class ProportionalCapacityPreemptionPolicy
@Override
public Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
if (!queueToPartitions.containsKey(queueName)) {
- return null;
+ throw new YarnRuntimeException("This shouldn't happen, cannot find "
+ + "TempQueuePerPartition collection for queueName=" + queueName);
}
return queueToPartitions.get(queueName).values();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
new file mode 100644
index 0000000..3900a35
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
@@ -0,0 +1,316 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+public class ReservedContainerCandidatesSelector
+ extends PreemptionCandidatesSelector {
+ private static final Log LOG =
+ LogFactory.getLog(ReservedContainerCandidatesSelector.class);
+
+ private PreemptableResourceCalculator preemptableAmountCalculator;
+
+ /**
+ * A temporary data structure to remember what to preempt on a node
+ */
+ private static class NodeForPreemption {
+ private float preemptionCost;
+ private FiCaSchedulerNode schedulerNode;
+ private List<RMContainer> selectedContainers;
+
+ public NodeForPreemption(float preemptionCost,
+ FiCaSchedulerNode schedulerNode, List<RMContainer> selectedContainers) {
+ this.preemptionCost = preemptionCost;
+ this.schedulerNode = schedulerNode;
+ this.selectedContainers = selectedContainers;
+ }
+ }
+
+ ReservedContainerCandidatesSelector(
+ CapacitySchedulerPreemptionContext preemptionContext) {
+ super(preemptionContext);
+ preemptableAmountCalculator = new PreemptableResourceCalculator(
+ preemptionContext, true);
+ }
+
+ @Override
+ public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource clusterResource,
+ Resource totalPreemptedResourceAllowed) {
+ // Calculate how much resources we need to preempt
+ preemptableAmountCalculator.computeIdealAllocation(clusterResource,
+ totalPreemptedResourceAllowed);
+
+ // Get queue to preemptable resource by partition
+ Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition =
+ new HashMap<>();
+ for (String leafQueue : preemptionContext.getLeafQueueNames()) {
+ queueToPreemptableResourceByPartition.put(leafQueue,
+ CapacitySchedulerPreemptionUtils
+ .getResToObtainByPartitionForLeafQueue(preemptionContext,
+ leafQueue, clusterResource));
+ }
+
+ // Get list of nodes for preemption, ordered by preemption cost
+ List<NodeForPreemption> nodesForPreemption = getNodesForPreemption(
+ clusterResource, queueToPreemptableResourceByPartition,
+ selectedCandidates, totalPreemptedResourceAllowed);
+
+ for (NodeForPreemption nfp : nodesForPreemption) {
+ RMContainer reservedContainer = nfp.schedulerNode.getReservedContainer();
+ if (null == reservedContainer) {
+ continue;
+ }
+
+ NodeForPreemption preemptionResult = getPreemptionCandidatesOnNode(
+ nfp.schedulerNode, clusterResource,
+ queueToPreemptableResourceByPartition, selectedCandidates,
+ totalPreemptedResourceAllowed, false);
+ if (null != preemptionResult) {
+ for (RMContainer c : preemptionResult.selectedContainers) {
+ ApplicationAttemptId appId = c.getApplicationAttemptId();
+ Set<RMContainer> containers = selectedCandidates.get(appId);
+ if (null == containers) {
+ containers = new HashSet<>();
+ selectedCandidates.put(appId, containers);
+ }
+
+ containers.add(c);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(this.getClass().getName() + " Marked container=" + c
+ .getContainerId() + " from queue=" + c.getQueueName()
+ + " to be preemption candidates");
+ }
+ }
+ }
+ }
+
+ return selectedCandidates;
+ }
+
+ private Resource getPreemptableResource(String queueName,
+ String partitionName,
+ Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition) {
+ Map<String, Resource> partitionToPreemptable =
+ queueToPreemptableResourceByPartition.get(queueName);
+ if (null == partitionToPreemptable) {
+ return null;
+ }
+
+ Resource preemptable = partitionToPreemptable.get(partitionName);
+ return preemptable;
+ }
+
+ private boolean tryToPreemptFromQueue(Resource cluster, String queueName,
+ String partitionName,
+ Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+ Resource required, Resource totalPreemptionAllowed, boolean readOnly) {
+ Resource preemptable = getPreemptableResource(queueName, partitionName,
+ queueToPreemptableResourceByPartition);
+ if (null == preemptable) {
+ return false;
+ }
+
+ if (!Resources.fitsIn(rc, cluster, required, preemptable)) {
+ return false;
+ }
+
+ if (!Resources.fitsIn(rc, cluster, required, totalPreemptionAllowed)) {
+ return false;
+ }
+
+ if (!readOnly) {
+ Resources.subtractFrom(preemptable, required);
+ Resources.subtractFrom(totalPreemptionAllowed, required);
+ }
+ return true;
+ }
+
+
+
+ /**
+ * Try to check if we can preempt resources for reserved container in given node
+ * @param node
+ * @param cluster
+ * @param queueToPreemptableResourceByPartition it's a map of
+ * <queueName, <partition, preemptable-resource>>
+ * @param readOnly do we want to modify preemptable resource after we selected
+ * candidates
+ * @return NodeForPreemption if it's possible to preempt containers on the node
+ * to satisfy reserved resource
+ */
+ private NodeForPreemption getPreemptionCandidatesOnNode(
+ FiCaSchedulerNode node, Resource cluster,
+ Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptionAllowed, boolean readOnly) {
+ RMContainer reservedContainer = node.getReservedContainer();
+ Resource available = Resources.clone(node.getAvailableResource());
+ Resource totalSelected = Resources.createResource(0);
+ List<RMContainer> sortedRunningContainers =
+ node.getCopiedListOfRunningContainers();
+ List<RMContainer> selectedContainers = new ArrayList<>();
+ Map<ContainerId, RMContainer> killableContainers =
+ node.getKillableContainers();
+
+ // Sort running container by launch time, we preferred to preempt recent
+ // launched preempt container
+ Collections.sort(sortedRunningContainers, new Comparator<RMContainer>() {
+ @Override public int compare(RMContainer o1, RMContainer o2) {
+ return -1 * o1.getContainerId().compareTo(o2.getContainerId());
+ }
+ });
+
+ // First check: can we preempt containers to allocate the
+ // reservedContainer?
+ boolean canAllocateReservedContainer = false;
+
+ // At least, we can get available + killable resources from this node
+ Resource cur = Resources.add(available, node.getTotalKillableResources());
+ String partition = node.getPartition();
+
+ // Avoid preempt any container if required <= available + killable
+ if (Resources.fitsIn(rc, cluster, reservedContainer.getReservedResource(),
+ cur)) {
+ return null;
+ }
+
+ // Extra cost of am container preemption
+ float amPreemptionCost = 0f;
+
+ for (RMContainer c : sortedRunningContainers) {
+ String containerQueueName = c.getQueueName();
+
+ // Skip container if it is already marked killable
+ if (killableContainers.containsKey(c.getContainerId())) {
+ continue;
+ }
+
+ // An alternative approach is add a "penalty cost" if AM container is
+ // selected. Here for safety, avoid preempt AM container in any cases
+ if (c.isAMContainer()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Skip selecting AM container on host=" + node.getNodeID()
+ + " AM container=" + c.getContainerId());
+ }
+ continue;
+ }
+
+ // Can we preempt container c?
+ // Check if we have quota to preempt this container
+ boolean canPreempt = tryToPreemptFromQueue(cluster, containerQueueName,
+ partition, queueToPreemptableResourceByPartition,
+ c.getAllocatedResource(), totalPreemptionAllowed, readOnly);
+
+ // If we can, add to selected container, and change resource accordingly.
+ if (canPreempt) {
+ if (!CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+ selectedCandidates)) {
+ if (!readOnly) {
+ selectedContainers.add(c);
+ }
+ Resources.addTo(totalSelected, c.getAllocatedResource());
+ }
+ Resources.addTo(cur, c.getAllocatedResource());
+ if (Resources.fitsIn(rc, cluster,
+ reservedContainer.getReservedResource(), cur)) {
+ canAllocateReservedContainer = true;
+ break;
+ }
+ }
+ }
+
+ if (!canAllocateReservedContainer) {
+ if (!readOnly) {
+ // Revert queue preemption quotas
+ for (RMContainer c : selectedContainers) {
+ Resource res = getPreemptableResource(c.getQueueName(), partition,
+ queueToPreemptableResourceByPartition);
+ if (null == res) {
+ // This shouldn't happen in normal cases, one possible cause is
+ // container moved to different queue while executing preemption logic.
+ // Ignore such failures.
+ continue;
+ }
+ Resources.addTo(res, c.getAllocatedResource());
+ }
+ }
+ return null;
+ }
+
+ float ratio = Resources.ratio(rc, totalSelected,
+ reservedContainer.getReservedResource());
+
+ // Compute preemption score
+ NodeForPreemption nfp = new NodeForPreemption(ratio + amPreemptionCost,
+ node, selectedContainers);
+
+ return nfp;
+ }
+
+ private List<NodeForPreemption> getNodesForPreemption(Resource cluster,
+ Map<String, Map<String, Resource>> queueToPreemptableResourceByPartition,
+ Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+ Resource totalPreemptionAllowed) {
+ List<NodeForPreemption> nfps = new ArrayList<>();
+
+ // get nodes have reserved container
+ for (FiCaSchedulerNode node : preemptionContext.getScheduler()
+ .getAllNodes().values()) {
+ if (node.getReservedContainer() != null) {
+ NodeForPreemption nfp = getPreemptionCandidatesOnNode(node, cluster,
+ queueToPreemptableResourceByPartition, selectedCandidates,
+ totalPreemptionAllowed, true);
+ if (null != nfp) {
+ // Null means we cannot preempt containers on the node to satisfy
+ // reserved container
+ nfps.add(nfp);
+ }
+ }
+ }
+
+ // Return sorted node-for-preemptions (by cost)
+ Collections.sort(nfps, new Comparator<NodeForPreemption>() {
+ @Override
+ public int compare(NodeForPreemption o1, NodeForPreemption o2) {
+ return Float.compare(o1.preemptionCost, o2.preemptionCost);
+ }
+ });
+
+ return nfps;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 8b01a73..04ed135 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -32,20 +33,22 @@ import java.util.ArrayList;
public class TempQueuePerPartition {
// Following fields are copied from scheduler
final String queueName;
- final Resource current;
- final Resource pending;
- final Resource guaranteed;
- final Resource maxCapacity;
- final Resource killable;
final String partition;
+ final Resource pending;
+
+ private final Resource current;
+ private final Resource killable;
+ private final Resource reserved;
+ private final float absCapacity;
+ private final float absMaxCapacity;
+ final Resource totalPartitionResource;
// Following fields are setted and used by candidate selection policies
Resource idealAssigned;
Resource toBePreempted;
Resource untouchableExtra;
Resource preemptableExtra;
- // For logging purpose
- Resource actuallyToBePreempted;
+ private Resource actuallyToBePreempted;
double normalizedGuarantee;
@@ -53,14 +56,22 @@ public class TempQueuePerPartition {
LeafQueue leafQueue;
boolean preemptionDisabled;
- TempQueuePerPartition(String queueName, Resource current, Resource pending,
- Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
- String partition, Resource killable) {
+ TempQueuePerPartition(String queueName, Resource current,
+ boolean preemptionDisabled, String partition, Resource killable,
+ float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
+ Resource reserved, CSQueue queue) {
this.queueName = queueName;
this.current = current;
- this.pending = pending;
- this.guaranteed = guaranteed;
- this.maxCapacity = maxCapacity;
+
+ if (queue instanceof LeafQueue) {
+ LeafQueue l = (LeafQueue) queue;
+ pending = l.getTotalPendingResourcesConsideringUserLimit(
+ totalPartitionResource, partition);
+ leafQueue = l;
+ } else {
+ pending = Resources.createResource(0);
+ }
+
this.idealAssigned = Resource.newInstance(0, 0);
this.actuallyToBePreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
@@ -71,6 +82,10 @@ public class TempQueuePerPartition {
this.preemptionDisabled = preemptionDisabled;
this.partition = partition;
this.killable = killable;
+ this.absCapacity = absCapacity;
+ this.absMaxCapacity = absMaxCapacity;
+ this.totalPartitionResource = totalPartitionResource;
+ this.reserved = reserved;
}
public void setLeafQueue(LeafQueue l) {
@@ -92,31 +107,101 @@ public class TempQueuePerPartition {
return children;
}
+ public Resource getUsed() {
+ return current;
+ }
+
+ public Resource getUsedDeductReservd() {
+ return Resources.subtract(current, reserved);
+ }
+
// This function "accepts" all the resources it can (pending) and return
// the unused ones
Resource offer(Resource avail, ResourceCalculator rc,
- Resource clusterResource) {
+ Resource clusterResource, boolean considersReservedResource) {
Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
- Resources.subtract(maxCapacity, idealAssigned),
+ Resources.subtract(getMax(), idealAssigned),
Resource.newInstance(0, 0));
// remain = avail - min(avail, (max - assigned), (current + pending - assigned))
- Resource accepted =
- Resources.min(rc, clusterResource,
- absMaxCapIdealAssignedDelta,
- Resources.min(rc, clusterResource, avail, Resources.subtract(
- Resources.add(current, pending), idealAssigned)));
+ Resource accepted = Resources.min(rc, clusterResource,
+ absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail,
+ Resources
+ /*
+ * When we're using FifoPreemptionSelector
+ * (considerReservedResource = false).
+ *
+ * We should deduct reserved resource to avoid excessive preemption:
+ *
+ * For example, if an under-utilized queue has used = reserved = 20.
+ * Preemption policy will try to preempt 20 containers
+ * (which is not satisfied) from different hosts.
+ *
+ * In FifoPreemptionSelector, there's no guarantee that preempted
+ * resource can be used by pending request, so policy will preempt
+ * resources repeatly.
+ */
+ .subtract(Resources.add(
+ (considersReservedResource ? getUsed() :
+ getUsedDeductReservd()),
+ pending), idealAssigned)));
Resource remain = Resources.subtract(avail, accepted);
Resources.addTo(idealAssigned, accepted);
return remain;
}
+ public Resource getGuaranteed() {
+ return Resources.multiply(totalPartitionResource, absCapacity);
+ }
+
+ public Resource getMax() {
+ return Resources.multiply(totalPartitionResource, absMaxCapacity);
+ }
+
+ public void updatePreemptableExtras(ResourceCalculator rc) {
+ // Reset untouchableExtra and preemptableExtra
+ untouchableExtra = Resources.none();
+ preemptableExtra = Resources.none();
+
+ Resource extra = Resources.subtract(getUsed(),
+ getGuaranteed());
+ if (Resources.lessThan(rc, totalPartitionResource, extra,
+ Resources.none())) {
+ extra = Resources.none();
+ }
+
+ if (null == children || children.isEmpty()) {
+ // If it is a leaf queue
+ if (preemptionDisabled) {
+ untouchableExtra = extra;
+ } else {
+ preemptableExtra = extra;
+ }
+ } else {
+ // If it is a parent queue
+ Resource childrensPreemptable = Resource.newInstance(0, 0);
+ for (TempQueuePerPartition child : children) {
+ Resources.addTo(childrensPreemptable, child.preemptableExtra);
+ }
+ // untouchableExtra = max(extra - childrenPreemptable, 0)
+ if (Resources.greaterThanOrEqual(rc, totalPartitionResource,
+ childrensPreemptable, extra)) {
+ untouchableExtra = Resource.newInstance(0, 0);
+ } else {
+ untouchableExtra = Resources.subtract(extra, childrensPreemptable);
+ }
+ preemptableExtra = Resources.min(rc, totalPartitionResource,
+ childrensPreemptable, extra);
+ }
+ }
+
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append(" NAME: " + queueName)
.append(" CUR: ").append(current)
.append(" PEN: ").append(pending)
- .append(" GAR: ").append(guaranteed)
+ .append(" RESERVED: ").append(reserved)
+ .append(" GAR: ").append(getGuaranteed())
.append(" NORM: ").append(normalizedGuarantee)
.append(" IDEAL_ASSIGNED: ").append(idealAssigned)
.append(" IDEAL_PREEMPT: ").append(toBePreempted)
@@ -130,29 +215,60 @@ public class TempQueuePerPartition {
public void assignPreemption(float scalingFactor, ResourceCalculator rc,
Resource clusterResource) {
- if (Resources.greaterThan(rc, clusterResource,
- Resources.subtract(current, killable), idealAssigned)) {
- toBePreempted = Resources.multiply(Resources
- .subtract(Resources.subtract(current, killable), idealAssigned),
- scalingFactor);
+ Resource usedDeductKillable = Resources.subtract(
+ getUsed(), killable);
+ Resource totalResource = Resources.add(getUsed(), pending);
+
+ // The minimum resource that we need to keep for a queue is:
+ // max(idealAssigned, min(used + pending, guaranteed)).
+ //
+ // Doing this because when we calculate ideal allocation doesn't consider
+ // reserved resource, ideal-allocation calculated could be less than
+ // guaranteed and total. We should avoid preempt from a queue if it is already
+ // <= its guaranteed resource.
+ Resource minimumQueueResource = Resources.max(rc, clusterResource,
+ Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
+ idealAssigned);
+
+ if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
+ minimumQueueResource)) {
+ toBePreempted = Resources.multiply(
+ Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor);
} else {
- toBePreempted = Resource.newInstance(0, 0);
+ toBePreempted = Resources.none();
+ }
+ }
+
+ public Resource getActuallyToBePreempted() {
+ return actuallyToBePreempted;
+ }
+
+ public void setActuallyToBePreempted(Resource res) {
+ this.actuallyToBePreempted = res;
+ }
+
+ public void deductActuallyToBePreempted(ResourceCalculator rc,
+ Resource cluster, Resource toBeDeduct) {
+ if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) {
+ Resources.subtractFrom(actuallyToBePreempted, toBeDeduct);
}
+ actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted,
+ Resources.none());
}
void appendLogString(StringBuilder sb) {
sb.append(queueName).append(", ")
- .append(current.getMemory()).append(", ")
+ .append(current.getMemorySize()).append(", ")
.append(current.getVirtualCores()).append(", ")
- .append(pending.getMemory()).append(", ")
+ .append(pending.getMemorySize()).append(", ")
.append(pending.getVirtualCores()).append(", ")
- .append(guaranteed.getMemory()).append(", ")
- .append(guaranteed.getVirtualCores()).append(", ")
- .append(idealAssigned.getMemory()).append(", ")
+ .append(getGuaranteed().getMemorySize()).append(", ")
+ .append(getGuaranteed().getVirtualCores()).append(", ")
+ .append(idealAssigned.getMemorySize()).append(", ")
.append(idealAssigned.getVirtualCores()).append(", ")
- .append(toBePreempted.getMemory()).append(", ")
+ .append(toBePreempted.getMemorySize()).append(", ")
.append(toBePreempted.getVirtualCores() ).append(", ")
- .append(actuallyToBePreempted.getMemory()).append(", ")
+ .append(actuallyToBePreempted.getMemorySize()).append(", ")
.append(actuallyToBePreempted.getVirtualCores());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
index dfe0886..f37923f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainer.java
@@ -89,4 +89,6 @@ public interface RMContainer extends EventHandler<RMContainerEvent> {
boolean hasIncreaseReservation();
void cancelIncreaseReservation();
+
+ String getQueueName();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2d0126b..44589aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -181,6 +181,7 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
// Only used for container resource increase and decrease. This is the
// resource to rollback to should container resource increase token expires.
private Resource lastConfirmedResource;
+ private volatile String queueName;
public RMContainerImpl(Container container,
ApplicationAttemptId appAttemptId, NodeId nodeId, String user,
@@ -817,4 +818,13 @@ public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
public void cancelIncreaseReservation() {
hasIncreaseReservation = false;
}
+
+ public void setQueueName(String queueName) {
+ this.queueName = queueName;
+ }
+
+ @Override
+ public String getQueueName() {
+ return queueName;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 864c3a8..47c83bc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -687,7 +687,7 @@ public abstract class AbstractYarnScheduler
updateMaximumAllocation(node, false);
// update resource to node
- node.setTotalResource(newResource);
+ node.updateTotalResource(newResource);
nodes.put(nm.getNodeID(), (N)node);
updateMaximumAllocation(node, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 17c60d9..13519a2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -403,6 +403,7 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
attemptResourceUsage.incReserved(node.getPartition(),
container.getResource());
+ ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
// Reset the re-reservation count
resetReReservations(priority);
@@ -750,14 +751,17 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public synchronized void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
+ String newQueueName = newQueue.getQueueName();
String user = getUser();
for (RMContainer liveContainer : liveContainers.values()) {
Resource resource = liveContainer.getContainer().getResource();
+ ((RMContainerImpl)liveContainer).setQueueName(newQueueName);
oldMetrics.releaseResources(user, 1, resource);
newMetrics.allocateResources(user, 1, resource, false);
}
for (Map<NodeId, RMContainer> map : reservedContainers.values()) {
for (RMContainer reservedContainer : map.values()) {
+ ((RMContainerImpl)reservedContainer).setQueueName(newQueueName);
Resource resource = reservedContainer.getReservedResource();
oldMetrics.unreserveResource(user, resource);
newMetrics.reserveResource(user, resource);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 81ddd11..8a3fd8b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -98,12 +98,12 @@ public abstract class SchedulerNode {
* Set total resources on the node.
* @param resource total resources on the node.
*/
- public synchronized void setTotalResource(Resource resource){
+ public synchronized void updateTotalResource(Resource resource){
this.totalResourceCapability = resource;
this.availableResource = Resources.subtract(totalResourceCapability,
this.usedResource);
}
-
+
/**
* Get the ID of the node which contains both its hostname and port.
*
@@ -176,7 +176,7 @@ public abstract class SchedulerNode {
+ " containers, " + getUsedResource() + " used and "
+ getAvailableResource() + " available after allocation");
}
-
+
/**
* The Scheduler increased container
*/
@@ -184,7 +184,7 @@ public abstract class SchedulerNode {
Resource deltaResource) {
changeContainerResource(containerId, deltaResource, true);
}
-
+
/**
* The Scheduler decreased container
*/
@@ -227,7 +227,8 @@ public abstract class SchedulerNode {
return false;
}
- protected synchronized void updateResource(Container container) {
+ protected synchronized void updateResourceForReleasedContainer(
+ Container container) {
addAvailableResource(container.getResource());
--numContainers;
}
@@ -246,7 +247,7 @@ public abstract class SchedulerNode {
/* remove the containers from the nodemanger */
if (null != launchedContainers.remove(container.getId())) {
- updateResource(container);
+ updateResourceForReleasedContainer(container);
}
LOG.info("Released container " + container.getId() + " of capacity "
@@ -303,7 +304,7 @@ public abstract class SchedulerNode {
return numContainers;
}
- public synchronized List<RMContainer> getRunningContainers() {
+ public synchronized List<RMContainer> getCopiedListOfRunningContainers() {
return new ArrayList<RMContainer>(launchedContainers.values());
}
@@ -312,7 +313,7 @@ public abstract class SchedulerNode {
}
protected synchronized void
- setReservedContainer(RMContainer reservedContainer) {
+ setReservedContainer(RMContainer reservedContainer) {
this.reservedContainer = reservedContainer;
}
@@ -330,7 +331,7 @@ public abstract class SchedulerNode {
public void updateLabels(Set<String> labels) {
this.labels = labels;
}
-
+
/**
* Get partition of which the node belongs to, if node-labels of this node is
* empty or null, it belongs to NO_LABEL partition. And since we only support
@@ -338,7 +339,7 @@ public abstract class SchedulerNode {
*/
public String getPartition() {
if (this.labels == null || this.labels.isEmpty()) {
- return RMNodeLabelsManager.NO_LABEL;
+ return RMNodeLabelsManager.NO_LABEL;
} else {
return this.labels.iterator().next();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5153018..ca4a2fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -35,7 +35,6 @@ import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -1150,7 +1149,7 @@ public class CapacityScheduler extends
String oldPartition = node.getPartition();
// Update resources of these containers
- for (RMContainer rmContainer : node.getRunningContainers()) {
+ for (RMContainer rmContainer : node.getCopiedListOfRunningContainers()) {
FiCaSchedulerApp application =
getApplicationAttempt(rmContainer.getApplicationAttemptId());
if (null != application) {
@@ -1506,7 +1505,7 @@ public class CapacityScheduler extends
}
// Remove running containers
- List<RMContainer> runningContainers = node.getRunningContainers();
+ List<RMContainer> runningContainers = node.getCopiedListOfRunningContainers();
for (RMContainer container : runningContainers) {
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
@@ -1623,9 +1622,9 @@ public class CapacityScheduler extends
public FiCaSchedulerNode getNode(NodeId nodeId) {
return nodes.get(nodeId);
}
-
+
@Lock(Lock.NoLock.class)
- Map<NodeId, FiCaSchedulerNode> getAllNodes() {
+ public Map<NodeId, FiCaSchedulerNode> getAllNodes() {
return nodes;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index ab3aa7d..6db5074 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1042,21 +1042,24 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return getBoolean(LAZY_PREEMPTION_ENALBED, DEFAULT_LAZY_PREEMPTION_ENABLED);
}
+ private static final String PREEMPTION_CONFIG_PREFIX =
+ "yarn.resourcemanager.monitor.capacity.preemption.";
+
/** If true, run the policy but do not affect the cluster with preemption and
* kill events. */
public static final String PREEMPTION_OBSERVE_ONLY =
- "yarn.resourcemanager.monitor.capacity.preemption.observe_only";
+ PREEMPTION_CONFIG_PREFIX + "observe_only";
public static final boolean DEFAULT_PREEMPTION_OBSERVE_ONLY = false;
/** Time in milliseconds between invocations of this policy */
public static final String PREEMPTION_MONITORING_INTERVAL =
- "yarn.resourcemanager.monitor.capacity.preemption.monitoring_interval";
+ PREEMPTION_CONFIG_PREFIX + "monitoring_interval";
public static final long DEFAULT_PREEMPTION_MONITORING_INTERVAL = 3000L;
/** Time in milliseconds between requesting a preemption from an application
* and killing the container. */
public static final String PREEMPTION_WAIT_TIME_BEFORE_KILL =
- "yarn.resourcemanager.monitor.capacity.preemption.max_wait_before_kill";
+ PREEMPTION_CONFIG_PREFIX + "max_wait_before_kill";
public static final long DEFAULT_PREEMPTION_WAIT_TIME_BEFORE_KILL = 15000L;
/** Maximum percentage of resources preemptionCandidates in a single round. By
@@ -1064,7 +1067,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* reclaimed from the cluster. After computing the total desired preemption,
* the policy scales it back within this limit. */
public static final String TOTAL_PREEMPTION_PER_ROUND =
- "yarn.resourcemanager.monitor.capacity.preemption.total_preemption_per_round";
+ PREEMPTION_CONFIG_PREFIX + "total_preemption_per_round";
public static final float DEFAULT_TOTAL_PREEMPTION_PER_ROUND = 0.1f;
/** Maximum amount of resources above the target capacity ignored for
@@ -1073,7 +1076,7 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* High values would slow the time to capacity and (absent natural
* completions) it might prevent convergence to guaranteed capacity. */
public static final String PREEMPTION_MAX_IGNORED_OVER_CAPACITY =
- "yarn.resourcemanager.monitor.capacity.preemption.max_ignored_over_capacity";
+ PREEMPTION_CONFIG_PREFIX + "max_ignored_over_capacity";
public static final float DEFAULT_PREEMPTION_MAX_IGNORED_OVER_CAPACITY = 0.1f;
/**
* Given a computed preemption target, account for containers naturally
@@ -1083,7 +1086,16 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
* will reclaim almost 95% of resources within 5 * {@link
* #PREEMPTION_WAIT_TIME_BEFORE_KILL}, even absent natural termination. */
public static final String PREEMPTION_NATURAL_TERMINATION_FACTOR =
- "yarn.resourcemanager.monitor.capacity.preemption.natural_termination_factor";
+ PREEMPTION_CONFIG_PREFIX + "natural_termination_factor";
public static final float DEFAULT_PREEMPTION_NATURAL_TERMINATION_FACTOR =
0.2f;
+
+ /**
+ * When calculating which containers to be preempted, we will try to preempt
+ * containers for reserved containers first. By default is false.
+ */
+ public static final String PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
+ PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
+ public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
+ false;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 595b3fe..38995ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -201,6 +201,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
new RMContainerImpl(container, this.getApplicationAttemptId(),
node.getNodeID(), appSchedulingInfo.getUser(), this.rmContext,
request.getNodeLabelExpression());
+ ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
updateAMContainerDiagnostics(AMState.ASSIGNED, null);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
index 1d0e78a..f90a53c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerNode.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
@@ -142,9 +143,9 @@ public class FiCaSchedulerNode extends SchedulerNode {
}
@Override
- protected synchronized void updateResource(
+ protected synchronized void updateResourceForReleasedContainer(
Container container) {
- super.updateResource(container);
+ super.updateResourceForReleasedContainer(container);
if (killableContainers.containsKey(container.getId())) {
Resources.subtractFrom(totalKillableResources, container.getResource());
killableContainers.remove(container.getId());
@@ -170,6 +171,6 @@ public class FiCaSchedulerNode extends SchedulerNode {
}
public synchronized Map<ContainerId, RMContainer> getKillableContainers() {
- return killableContainers;
+ return Collections.unmodifiableMap(killableContainers);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 4b21f7f..24dcd06 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -379,6 +379,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
RMContainer rmContainer = new RMContainerImpl(container,
getApplicationAttemptId(), node.getNodeID(),
appSchedulingInfo.getUser(), rmContext);
+ ((RMContainerImpl)rmContainer).setQueueName(this.getQueueName());
// Add it to allContainers list.
newlyAllocatedContainers.add(rmContainer);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
index d6bc1fd..960339e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
@@ -916,7 +916,8 @@ public class FairScheduler extends
triggerUpdate();
// Remove running containers
- List<RMContainer> runningContainers = node.getRunningContainers();
+ List<RMContainer> runningContainers =
+ node.getCopiedListOfRunningContainers();
for (RMContainer container : runningContainers) {
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index 409a043..664bd4e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -937,7 +937,7 @@ public class FifoScheduler extends
return;
}
// Kill running containers
- for(RMContainer container : node.getRunningContainers()) {
+ for(RMContainer container : node.getCopiedListOfRunningContainers()) {
super.completedContainer(container,
SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(),
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/4] hadoop git commit: YARN-4390. Do surgical preemption based on
reserved container in CapacityScheduler. Contributed by Wangda Tan
Posted by ep...@apache.org.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..07d1eef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestProportionalCapacityPreemptionPolicyMockFramework
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+ @Test
+ public void testBuilder() throws Exception {
+ /**
+ * Test of test, make sure we build expected mock schedulable objects
+ */
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3="; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+ "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+ "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check queues:
+ // root
+ checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("root"), "", 100);
+ checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "red", 100);
+ checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+ // a
+ checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("a"), "", 100);
+ checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+ // a1
+ checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+ checkPendingResource(cs.getQueue("a1"), "", 100);
+ checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a1"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+ // a2
+ checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+ checkPendingResource(cs.getQueue("a2"), "", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+ // b1
+ checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("b"), "", 0);
+ checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("b"), "red", 100);
+ checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+ // Check ignored partitioned containers in queue
+ Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+ .getIgnoreExclusivityRMContainers().get("blue").size());
+
+ // Check applications
+ Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+ // Check #containers
+ FiCaSchedulerApp app1 = getApp("a1", 1);
+ FiCaSchedulerApp app2 = getApp("a1", 2);
+ FiCaSchedulerApp app3 = getApp("a2", 3);
+ FiCaSchedulerApp app4 = getApp("b", 4);
+
+ Assert.assertEquals(50, app1.getLiveContainers().size());
+ checkContainerNodesInApp(app1, 50, "n3");
+
+ Assert.assertEquals(50, app2.getLiveContainers().size());
+ Assert.assertEquals(150, app2.getReservedContainers().size());
+ checkContainerNodesInApp(app2, 200, "n2");
+
+ Assert.assertEquals(50, app3.getLiveContainers().size());
+ checkContainerNodesInApp(app3, 50, "n3");
+
+ Assert.assertEquals(100, app4.getLiveContainers().size());
+ checkContainerNodesInApp(app4, 100, "n1");
+ }
+
+ @Test
+ public void testBuilderWithReservedResource() throws Exception {
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3="; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+ "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+ "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check queues:
+ // root
+ checkReservedResource(cs.getQueue("root"), "", 100);
+ checkReservedResource(cs.getQueue("root"), "red", 90);
+
+ // a
+ checkReservedResource(cs.getQueue("a"), "", 50);
+ checkReservedResource(cs.getQueue("a"), "red", 40);
+
+ // a1
+ checkReservedResource(cs.getQueue("a1"), "", 40);
+ checkReservedResource(cs.getQueue("a1"), "red", 20);
+
+ // b
+ checkReservedResource(cs.getQueue("b"), "", 0);
+ checkReservedResource(cs.getQueue("b"), "red", 0);
+ }
+
+ @Test
+ public void testBuilderWithSpecifiedNodeResources() throws Exception {
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red res=100;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3= res=30"; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+ "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+ "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check host resources
+ Assert.assertEquals(3, this.cs.getAllNodes().size());
+ SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
+ Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+ Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
+ Assert.assertNull(node1.getReservedContainer());
+
+ SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
+ Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+ Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
+ Assert.assertNotNull(node2.getReservedContainer());
+
+ SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
+ Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+ Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
+ Assert.assertNull(node3.getReservedContainer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index 88216f8..54166c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.After;
@@ -144,7 +145,7 @@ public class TestSchedulerApplicationAttempt {
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
Resource resource) {
ContainerId containerId = ContainerId.newContainerId(appAttId, id);
- RMContainer rmContainer = mock(RMContainer.class);
+ RMContainer rmContainer = mock(RMContainerImpl.class);
Container container = mock(Container.class);
when(container.getResource()).thenReturn(resource);
when(container.getNodeId()).thenReturn(nodeId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
new file mode 100644
index 0000000..bd9f615
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CapacitySchedulerPreemptionTestBase {
+
+ final int GB = 1024;
+
+ Configuration conf;
+
+ RMNodeLabelsManager mgr;
+
+ Clock clock;
+
+ @Before
+ void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+ conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+ // Set preemption related configurations
+ conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+ 0);
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ 1.0f);
+ conf.setFloat(
+ CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+ 1.0f);
+ conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+ 60000L);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(this.conf);
+ clock = mock(Clock.class);
+ when(clock.getTime()).thenReturn(0L);
+ }
+
+ SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+ ResourceManager.RMActiveServices activeServices = rm.getRMActiveService();
+ SchedulingMonitor mon = null;
+ for (Service service : activeServices.getServices()) {
+ if (service instanceof SchedulingMonitor) {
+ mon = (SchedulingMonitor) service;
+ break;
+ }
+ }
+
+ if (mon != null) {
+ return mon.getSchedulingEditPolicy();
+ }
+ return null;
+ }
+
+ public void waitNumberOfLiveContainersFromApp(FiCaSchedulerApp app,
+ int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 10) {
+ System.out.println(app.getLiveContainers().size());
+ if (app.getLiveContainers().size() == expected) {
+ return;
+ }
+ Thread.sleep(100);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+
+ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
+ int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 10) {
+ System.out.println(app.getReservedContainers().size());
+ if (app.getReservedContainers().size() == expected) {
+ return;
+ }
+ Thread.sleep(100);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+
+ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
+ ApplicationAttemptId appId, int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 500) {
+ int total = 0;
+ for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+ if (c.getApplicationAttemptId().equals(appId)) {
+ total++;
+ }
+ }
+ if (total == expected) {
+ return;
+ }
+ Thread.sleep(10);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 6c9faf7..925e7f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -3371,7 +3371,7 @@ public class TestCapacityScheduler {
resourceManager
.getResourceScheduler()
.getSchedulerNode(resourceEvent.getNodeId())
- .setTotalResource(resourceEvent.getResourceOption().getResource());
+ .updateTotalResource(resourceEvent.getResourceOption().getResource());
}
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
new file mode 100644
index 0000000..e2d21c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerLazyPreemption
+ extends CapacitySchedulerPreemptionTestBase {
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+ true);
+ }
+
+ @Test (timeout = 60000)
+ public void testSimplePreemption() throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * 1) Two nodes in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+ * more resource available.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * Now the cluster is fulfilled.
+ *
+ * 4) app2 asks for another 1G container, system will preempt one container
+ * from app1, and app2 will receive the preempted container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(1, killableContainers.size());
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 6 containers, and app2 has 2 containers
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersNodeLocalityDelay()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with locality specified, so it needs
+ * to wait for missed-opportunity before get scheduled.
+ * Check if system waits missed-opportunity before finish killable container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container with unknown host and unknown rack
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "unknownhost",
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, one container will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersHardNodeLocality()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with hard locality specified, and
+ * asked host is not existed
+ * Confirm system doesn't preempt any container.
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container for h3 with hard locality,
+ // h3 doesn't exist in the cluster
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1, true), ResourceRequest
+ .newInstance(Priority.newInstance(1), "h3",
+ Resources.createResource(1 * GB), 1, false), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1, false)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, nothing will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+ throws Exception {
+ /**
+ * Test case:
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Killable containers resources will be excluded from PCPP (no duplicated
+ * container added to killable list)
+ * 2) When more resources need to be preempted, new containers will be selected
+ * and killable containers will be considered
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+ // Check killable containers and to-be-preempted containers in edit policy
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Run edit schedule again, confirm status doesn't changed
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Save current to kill containers
+ Set<ContainerId> previousKillableContainers = new HashSet<>(
+ pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+ .keySet());
+
+ // Update request resource of c from 1 to 2, so we need to preempt
+ // one more container
+ am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
+
+ // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+ // and 1 container in killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+ // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Check if previous killable containers included by new killable containers
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ Assert.assertTrue(
+ Sets.difference(previousKillableContainers, killableContainers.keySet())
+ .isEmpty());
+ }
+
+ /*
+ * Ignore this test now because it could be a premature optimization
+ */
+ @Ignore
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+ throws Exception {
+ /**
+ * Test case:
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Containers will be marked to killable
+ * 2) Cancel resource request
+ * 3) Killable containers will be cancelled from policy and scheduler
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if 3 container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+ // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
+ am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+ // Call editSchedule once more to make sure still nothing happens
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersUserLimit()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
+ *
+ * 1) Two nodes in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+ * more resource available.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * Now the cluster is fulfilled.
+ *
+ * 4) app2 asks for another 1G container, system will preempt one container
+ * from app1, and app2 will receive the preempted container
+ */
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+ csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
+ MockRM rm1 = new MockRM(csConf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if no container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ // No preemption happens
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+ Assert.assertEquals(0, killableContainers.size());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ private Map<ContainerId, RMContainer> waitKillableContainersSize(
+ PreemptionManager pm, String queueName, String partition,
+ int expectedSize) throws InterruptedException {
+ Map<ContainerId, RMContainer> killableContainers =
+ pm.getKillableContainersMap(queueName, partition);
+
+ int wait = 0;
+ // Wait for at most 5 sec (it should be super fast actually)
+ while (expectedSize != killableContainers.size() && wait < 500) {
+ killableContainers = pm.getKillableContainersMap(queueName, partition);
+ Thread.sleep(10);
+ wait++;
+ }
+
+ Assert.assertEquals(expectedSize, killableContainers.size());
+ return killableContainers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index 001899d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,683 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCapacitySchedulerPreemption {
- private static final Log LOG = LogFactory.getLog(
- TestCapacitySchedulerPreemption.class);
-
- private final int GB = 1024;
-
- private Configuration conf;
-
- RMNodeLabelsManager mgr;
-
- Clock clock;
-
- @Before
- public void setUp() throws Exception {
- conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
- conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
- ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
- conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
- // Set preemption related configurations
- conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
- 0);
- conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
- true);
- conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
- 1.0f);
- conf.setFloat(
- CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
- 1.0f);
- mgr = new NullRMNodeLabelsManager();
- mgr.init(this.conf);
- clock = mock(Clock.class);
- when(clock.getTime()).thenReturn(0L);
- }
-
- private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
- RMActiveServices activeServices = rm.getRMActiveService();
- SchedulingMonitor mon = null;
- for (Service service : activeServices.getServices()) {
- if (service instanceof SchedulingMonitor) {
- mon = (SchedulingMonitor) service;
- break;
- }
- }
-
- if (mon != null) {
- return mon.getSchedulingEditPolicy();
- }
- return null;
- }
-
- @Test (timeout = 60000)
- public void testSimplePreemption() throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
-
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(1, killableContainers.size());
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 6 containers, and app2 has 2 containers
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersNodeLocalityDelay()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with locality specified, so it needs
- * to wait for missed-opportunity before get scheduled.
- * Check if system waits missed-opportunity before finish killable container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container with unknown host and unknown rack
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "unknownhost",
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, one container will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 6 containers, and app2 has 2 containers (new container allocated)
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersHardNodeLocality()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with hard locality specified, and
- * asked host is not existed
- * Confirm system doesn't preempt any container.
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container for h3 with hard locality,
- // h3 doesn't exist in the cluster
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1, true), ResourceRequest
- .newInstance(Priority.newInstance(1), "h3",
- Resources.createResource(1 * GB), 1, false), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1, false)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, nothing will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 7 containers, and app2 has 1 containers (no container allocated)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Killable containers resources will be excluded from PCPP (no duplicated
- * container added to killable list)
- * 2) When more resources need to be preempted, new containers will be selected
- * and killable containers will be considered
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
- // Check killable containers and to-be-preempted containers in edit policy
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Run edit schedule again, confirm status doesn't changed
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Save current to kill containers
- Set<ContainerId> previousKillableContainers = new HashSet<>(
- pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
- .keySet());
-
- // Update request resource of c from 1 to 2, so we need to preempt
- // one more container
- am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
- // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
- // and 1 container in killable map
- editPolicy.editSchedule();
- Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
- // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Check if previous killable containers included by new killable containers
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- Assert.assertTrue(
- Sets.difference(previousKillableContainers, killableContainers.keySet())
- .isEmpty());
- }
-
- /*
- * Ignore this test now because it could be a premature optimization
- */
- @Ignore
- @Test (timeout = 60000)
- public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Containers will be marked to killable
- * 2) Cancel resource request
- * 3) Killable containers will be cancelled from policy and scheduler
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if 3 container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
- // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
- am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
- // Call editSchedule once more to make sure still nothing happens
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersUserLimit()
- throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
- csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
- MockRM rm1 = new MockRM(csConf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if no container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- // No preemption happens
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
- Assert.assertEquals(0, killableContainers.size());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- private Map<ContainerId, RMContainer> waitKillableContainersSize(
- PreemptionManager pm, String queueName, String partition,
- int expectedSize) throws InterruptedException {
- Map<ContainerId, RMContainer> killableContainers =
- pm.getKillableContainersMap(queueName, partition);
-
- int wait = 0;
- // Wait for at most 5 sec (it should be super fast actually)
- while (expectedSize != killableContainers.size() && wait < 500) {
- killableContainers = pm.getKillableContainersMap(queueName, partition);
- Thread.sleep(10);
- wait++;
- }
-
- Assert.assertEquals(expectedSize, killableContainers.size());
- return killableContainers;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org