You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/09 20:39:40 UTC
[2/4] hadoop git commit: YARN-4390. Do surgical preemption based on
reserved container in CapacityScheduler. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
new file mode 100644
index 0000000..07d1eef
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyMockFramework.java
@@ -0,0 +1,247 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestProportionalCapacityPreemptionPolicyMockFramework
+ extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+ @Test
+ public void testBuilder() throws Exception {
+ /**
+ * Test of test, make sure we build expected mock schedulable objects
+ */
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3="; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+ "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+ "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check queues:
+ // root
+ checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("root"), "", 100);
+ checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "red", 100);
+ checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+ // a
+ checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("a"), "", 100);
+ checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+ // a1
+ checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+ checkPendingResource(cs.getQueue("a1"), "", 100);
+ checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a1"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+ // a2
+ checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+ checkPendingResource(cs.getQueue("a2"), "", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+ // b1
+ checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("b"), "", 0);
+ checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("b"), "red", 100);
+ checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+ // Check ignored partitioned containers in queue
+ Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+ .getIgnoreExclusivityRMContainers().get("blue").size());
+
+ // Check applications
+ Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+ // Check #containers
+ FiCaSchedulerApp app1 = getApp("a1", 1);
+ FiCaSchedulerApp app2 = getApp("a1", 2);
+ FiCaSchedulerApp app3 = getApp("a2", 3);
+ FiCaSchedulerApp app4 = getApp("b", 4);
+
+ Assert.assertEquals(50, app1.getLiveContainers().size());
+ checkContainerNodesInApp(app1, 50, "n3");
+
+ Assert.assertEquals(50, app2.getLiveContainers().size());
+ Assert.assertEquals(150, app2.getReservedContainers().size());
+ checkContainerNodesInApp(app2, 200, "n2");
+
+ Assert.assertEquals(50, app3.getLiveContainers().size());
+ checkContainerNodesInApp(app3, 50, "n3");
+
+ Assert.assertEquals(100, app4.getLiveContainers().size());
+ checkContainerNodesInApp(app4, 100, "n1");
+ }
+
+ @Test
+ public void testBuilderWithReservedResource() throws Exception {
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3="; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+ "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+ "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check queues:
+ // root
+ checkReservedResource(cs.getQueue("root"), "", 100);
+ checkReservedResource(cs.getQueue("root"), "red", 90);
+
+ // a
+ checkReservedResource(cs.getQueue("a"), "", 50);
+ checkReservedResource(cs.getQueue("a"), "red", 40);
+
+ // a1
+ checkReservedResource(cs.getQueue("a1"), "", 40);
+ checkReservedResource(cs.getQueue("a1"), "red", 20);
+
+ // b
+ checkReservedResource(cs.getQueue("b"), "", 0);
+ checkReservedResource(cs.getQueue("b"), "red", 0);
+ }
+
+ @Test
+ public void testBuilderWithSpecifiedNodeResources() throws Exception {
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red res=100;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3= res=30"; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100 100],red=[100 100 100 100 90],blue=[200 200 200 200 80]);" + //root
+ "-a(=[100 200 100 100 50],red=[0 0 0 0 40],blue=[200 200 200 200 30]);" + // a
+ "--a1(=[50 100 50 100 40],red=[0 0 0 0 20],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0 10],red=[0 0 0 0 20],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check host resources
+ Assert.assertEquals(3, this.cs.getAllNodes().size());
+ SchedulerNode node1 = cs.getSchedulerNode(NodeId.newInstance("n1", 1));
+ Assert.assertEquals(100, node1.getTotalResource().getMemorySize());
+ Assert.assertEquals(100, node1.getCopiedListOfRunningContainers().size());
+ Assert.assertNull(node1.getReservedContainer());
+
+ SchedulerNode node2 = cs.getSchedulerNode(NodeId.newInstance("n2", 1));
+ Assert.assertEquals(0, node2.getTotalResource().getMemorySize());
+ Assert.assertEquals(50, node2.getCopiedListOfRunningContainers().size());
+ Assert.assertNotNull(node2.getReservedContainer());
+
+ SchedulerNode node3 = cs.getSchedulerNode(NodeId.newInstance("n3", 1));
+ Assert.assertEquals(30, node3.getTotalResource().getMemorySize());
+ Assert.assertEquals(100, node3.getCopiedListOfRunningContainers().size());
+ Assert.assertNull(node3.getReservedContainer());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index 88216f8..54166c0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -39,6 +39,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.junit.After;
@@ -144,7 +145,7 @@ public class TestSchedulerApplicationAttempt {
private RMContainer createRMContainer(ApplicationAttemptId appAttId, int id,
Resource resource) {
ContainerId containerId = ContainerId.newContainerId(appAttId, id);
- RMContainer rmContainer = mock(RMContainer.class);
+ RMContainer rmContainer = mock(RMContainerImpl.class);
Container container = mock(Container.class);
when(container.getResource()).thenReturn(resource);
when(container.getNodeId()).thenReturn(nodeId);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
new file mode 100644
index 0000000..bd9f615
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerPreemptionTestBase.java
@@ -0,0 +1,149 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.Application;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.util.Clock;
+import org.junit.Assert;
+import org.junit.Before;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class CapacitySchedulerPreemptionTestBase {
+
+ final int GB = 1024;
+
+ Configuration conf;
+
+ RMNodeLabelsManager mgr;
+
+ Clock clock;
+
+ @Before
+ void setUp() throws Exception {
+ conf = new YarnConfiguration();
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
+ conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
+
+ // Set preemption related configurations
+ conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
+ 0);
+ conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+ 1.0f);
+ conf.setFloat(
+ CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
+ 1.0f);
+ conf.setLong(CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
+ 60000L);
+ mgr = new NullRMNodeLabelsManager();
+ mgr.init(this.conf);
+ clock = mock(Clock.class);
+ when(clock.getTime()).thenReturn(0L);
+ }
+
+ SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
+ ResourceManager.RMActiveServices activeServices = rm.getRMActiveService();
+ SchedulingMonitor mon = null;
+ for (Service service : activeServices.getServices()) {
+ if (service instanceof SchedulingMonitor) {
+ mon = (SchedulingMonitor) service;
+ break;
+ }
+ }
+
+ if (mon != null) {
+ return mon.getSchedulingEditPolicy();
+ }
+ return null;
+ }
+
+ public void waitNumberOfLiveContainersFromApp(FiCaSchedulerApp app,
+ int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 10) {
+ System.out.println(app.getLiveContainers().size());
+ if (app.getLiveContainers().size() == expected) {
+ return;
+ }
+ Thread.sleep(100);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+
+ public void waitNumberOfReservedContainersFromApp(FiCaSchedulerApp app,
+ int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 10) {
+ System.out.println(app.getReservedContainers().size());
+ if (app.getReservedContainers().size() == expected) {
+ return;
+ }
+ Thread.sleep(100);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+
+ public void waitNumberOfLiveContainersOnNodeFromApp(FiCaSchedulerNode node,
+ ApplicationAttemptId appId, int expected) throws InterruptedException {
+ int waitNum = 0;
+
+ while (waitNum < 500) {
+ int total = 0;
+ for (RMContainer c : node.getCopiedListOfRunningContainers()) {
+ if (c.getApplicationAttemptId().equals(appId)) {
+ total++;
+ }
+ }
+ if (total == expected) {
+ return;
+ }
+ Thread.sleep(10);
+ waitNum++;
+ }
+
+ Assert.fail();
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 6c9faf7..925e7f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -3371,7 +3371,7 @@ public class TestCapacityScheduler {
resourceManager
.getResourceScheduler()
.getSchedulerNode(resourceEvent.getNodeId())
- .setTotalResource(resourceEvent.getResourceOption().getResource());
+ .updateTotalResource(resourceEvent.getResourceOption().getResource());
}
}
});
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
new file mode 100644
index 0000000..e2d21c5
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerLazyPreemption.java
@@ -0,0 +1,639 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.collect.Sets;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
+import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
+import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+public class TestCapacitySchedulerLazyPreemption
+ extends CapacitySchedulerPreemptionTestBase {
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
+ true);
+ }
+
+ @Test (timeout = 60000)
+ public void testSimplePreemption() throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * 1) Two nodes in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+ * more resource available.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * Now the cluster is fulfilled.
+ *
+ * 4) app2 asks for another 1G container, system will preempt one container
+ * from app1, and app2 will receive the preempted container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(1, killableContainers.size());
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 6 containers, and app2 has 2 containers
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersNodeLocalityDelay()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with locality specified, so it needs
+ * to wait for missed-opportunity before get scheduled.
+ * Check if system waits missed-opportunity before finish killable container
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container with unknown host and unknown rack
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "unknownhost",
+ Resources.createResource(1 * GB), 1), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, one container will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 6 containers, and app2 has 2 containers (new container allocated)
+ Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersHardNodeLocality()
+ throws Exception {
+ /**
+ * Test case: same as testSimplePreemption steps 1-3.
+ *
+ * Step 4: app2 asks for 1G container with hard locality specified, and
+ * asked host is not existed
+ * Confirm system doesn't preempt any container.
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container for h3 with hard locality,
+ // h3 doesn't exist in the cluster
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1, true), ResourceRequest
+ .newInstance(Priority.newInstance(1), "h3",
+ Resources.createResource(1 * GB), 1, false), ResourceRequest
+ .newInstance(Priority.newInstance(1), "/default-rack",
+ Resources.createResource(1 * GB), 1, false)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+ Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
+ .getApplicationAttemptId(), am1.getApplicationAttemptId());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (no container preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ // Do allocation again, nothing will be preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ // App1 has 7 containers, and app2 has 1 containers (no container allocated)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
+ throws Exception {
+ /**
+ * Test case:
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Killable containers resources will be excluded from PCPP (no duplicated
+ * container added to killable list)
+ * 2) When more resources need to be preempted, new containers will be selected
+ * and killable containers will be considered
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if one container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
+
+ // Check killable containers and to-be-preempted containers in edit policy
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Run edit schedule again, confirm status doesn't changed
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Save current to kill containers
+ Set<ContainerId> previousKillableContainers = new HashSet<>(
+ pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
+ .keySet());
+
+ // Update request resource of c from 1 to 2, so we need to preempt
+ // one more container
+ am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
+
+ // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
+ // and 1 container in killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
+
+ // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+
+ // Check if previous killable containers included by new killable containers
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ Assert.assertTrue(
+ Sets.difference(previousKillableContainers, killableContainers.keySet())
+ .isEmpty());
+ }
+
+ /*
+ * Ignore this test now because it could be a premature optimization
+ */
+ @Ignore
+ @Test (timeout = 60000)
+ public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
+ throws Exception {
+ /**
+ * Test case:
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ * Submit applications to two queues, one uses more than the other, so
+ * preemption will happen.
+ *
+ * Check:
+ * 1) Containers will be marked to killable
+ * 2) Cancel resource request
+ * 3) Killable containers will be cancelled from policy and scheduler
+ */
+ MockRM rm1 = new MockRM(conf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 6 times for node1
+ for (int i = 0; i < 6; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+
+ // NM1 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
+
+ // Get edit policy and do one update
+ ProportionalCapacityPreemptionPolicy editPolicy =
+ (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if 3 container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ PreemptionManager pm = cs.getPreemptionManager();
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
+
+ // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
+ am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+
+ // Call editSchedule once more to make sure still nothing happens
+ editPolicy.editSchedule();
+ Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
+ }
+
+ @Test (timeout = 60000)
+ public void testPreemptionConsidersUserLimit()
+ throws Exception {
+ /**
+ * Test case: Submit two application (app1/app2) to different queues, queue
+ * structure:
+ *
+ * <pre>
+ * Root
+ * / | \
+ * a b c
+ * 10 20 70
+ * </pre>
+ *
+ * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
+ *
+ * 1) Two nodes in the cluster, each of them has 4G.
+ *
+ * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
+ * more resource available.
+ *
+ * 3) app2 submit to queue-c, ask for one 1G container (for AM)
+ *
+ * Now the cluster is fulfilled.
+ *
+ * 4) app2 asks for another 1G container, system will preempt one container
+ * from app1, and app2 will receive the preempted container
+ */
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
+ csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
+ MockRM rm1 = new MockRM(csConf);
+ rm1.getRMContext().setNodeLabelManager(mgr);
+ rm1.start();
+ MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
+ MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
+ CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+ RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+ RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+ // launch an app to queue, AM container should be launched in nm1
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+ am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
+
+ // Do allocation 3 times for node1/node2
+ for (int i = 0; i < 3; i++) {
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+ }
+
+ // App1 should have 7 containers now, and no available resource for cluster
+ FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+ am1.getApplicationAttemptId());
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+
+ // Submit app2 to queue-c and asks for a 1G container for AM
+ RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
+
+ // NM1/NM2 has available resource = 0G
+ Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
+ .getAvailableResource().getMemorySize());
+ Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
+ .getAvailableResource().getMemorySize());
+
+ // AM asks for a 1 * GB container
+ am2.allocate(Arrays.asList(ResourceRequest
+ .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
+ Resources.createResource(1 * GB), 1)), null);
+
+ // Get edit policy and do one update
+ SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
+
+ // Call edit schedule twice, and check if no container from app1 marked
+ // to be "killable"
+ editPolicy.editSchedule();
+ editPolicy.editSchedule();
+
+ // No preemption happens
+ PreemptionManager pm = cs.getPreemptionManager();
+ Map<ContainerId, RMContainer> killableContainers =
+ waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
+ Assert.assertEquals(0, killableContainers.size());
+
+ // Call CS.handle once to see if container preempted
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+
+ FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+ am2.getApplicationAttemptId());
+
+ // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
+ Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
+ Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
+
+ rm1.close();
+ }
+
+ private Map<ContainerId, RMContainer> waitKillableContainersSize(
+ PreemptionManager pm, String queueName, String partition,
+ int expectedSize) throws InterruptedException {
+ Map<ContainerId, RMContainer> killableContainers =
+ pm.getKillableContainersMap(queueName, partition);
+
+ int wait = 0;
+ // Wait for at most 5 sec (it should be super fast actually)
+ while (expectedSize != killableContainers.size() && wait < 500) {
+ killableContainers = pm.getKillableContainersMap(queueName, partition);
+ Thread.sleep(10);
+ wait++;
+ }
+
+ Assert.assertEquals(expectedSize, killableContainers.size());
+ return killableContainers;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/40367c8d/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
deleted file mode 100644
index 001899d..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPreemption.java
+++ /dev/null
@@ -1,683 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import com.google.common.collect.Sets;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.service.Service;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
-import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
-import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager.RMActiveServices;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
-import org.apache.hadoop.yarn.util.Clock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Assert;
-import org.junit.Before;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashSet;
-import java.util.Map;
-import java.util.Set;
-
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-public class TestCapacitySchedulerPreemption {
- private static final Log LOG = LogFactory.getLog(
- TestCapacitySchedulerPreemption.class);
-
- private final int GB = 1024;
-
- private Configuration conf;
-
- RMNodeLabelsManager mgr;
-
- Clock clock;
-
- @Before
- public void setUp() throws Exception {
- conf = new YarnConfiguration();
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
- conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
- conf.setClass(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
- ProportionalCapacityPreemptionPolicy.class, SchedulingEditPolicy.class);
- conf = TestUtils.getConfigurationWithMultipleQueues(this.conf);
-
- // Set preemption related configurations
- conf.setInt(CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
- 0);
- conf.setBoolean(CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
- true);
- conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
- 1.0f);
- conf.setFloat(
- CapacitySchedulerConfiguration.PREEMPTION_NATURAL_TERMINATION_FACTOR,
- 1.0f);
- mgr = new NullRMNodeLabelsManager();
- mgr.init(this.conf);
- clock = mock(Clock.class);
- when(clock.getTime()).thenReturn(0L);
- }
-
- private SchedulingEditPolicy getSchedulingEditPolicy(MockRM rm) {
- RMActiveServices activeServices = rm.getRMActiveService();
- SchedulingMonitor mon = null;
- for (Service service : activeServices.getServices()) {
- if (service instanceof SchedulingMonitor) {
- mon = (SchedulingMonitor) service;
- break;
- }
- }
-
- if (mon != null) {
- return mon.getSchedulingEditPolicy();
- }
- return null;
- }
-
- @Test (timeout = 60000)
- public void testSimplePreemption() throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
-
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 7, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(1, killableContainers.size());
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 6 containers, and app2 has 2 containers
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersNodeLocalityDelay()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with locality specified, so it needs
- * to wait for missed-opportunity before get scheduled.
- * Check if system waits missed-opportunity before finish killable container
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container with unknown host and unknown rack
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "unknownhost",
- Resources.createResource(1 * GB), 1), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, one container will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 6 containers, and app2 has 2 containers (new container allocated)
- Assert.assertEquals(6, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersHardNodeLocality()
- throws Exception {
- /**
- * Test case: same as testSimplePreemption steps 1-3.
- *
- * Step 4: app2 asks for 1G container with hard locality specified, and
- * asked host is not existed
- * Confirm system doesn't preempt any container.
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container for h3 with hard locality,
- // h3 doesn't exist in the cluster
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1, true), ResourceRequest
- .newInstance(Priority.newInstance(1), "h3",
- Resources.createResource(1 * GB), 1, false), ResourceRequest
- .newInstance(Priority.newInstance(1), "/default-rack",
- Resources.createResource(1 * GB), 1, false)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
- Assert.assertEquals(killableContainers.entrySet().iterator().next().getKey()
- .getApplicationAttemptId(), am1.getApplicationAttemptId());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (no container preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- // Do allocation again, nothing will be preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- // App1 has 7 containers, and app2 has 1 containers (no container allocated)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- @Test (timeout = 60000)
- public void testPreemptionPolicyShouldRespectAlreadyMarkedKillableContainers()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Killable containers resources will be excluded from PCPP (no duplicated
- * container added to killable list)
- * 2) When more resources need to be preempted, new containers will be selected
- * and killable containers will be considered
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- am2.allocate("*", 1 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if one container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 1);
-
- // Check killable containers and to-be-preempted containers in edit policy
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Run edit schedule again, confirm status doesn't changed
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Save current to kill containers
- Set<ContainerId> previousKillableContainers = new HashSet<>(
- pm.getKillableContainersMap("a", RMNodeLabelsManager.NO_LABEL)
- .keySet());
-
- // Update request resource of c from 1 to 2, so we need to preempt
- // one more container
- am2.allocate("*", 1 * GB, 2, new ArrayList<ContainerId>());
-
- // Call editPolicy.editSchedule() once, we should have 1 container in to-preempt map
- // and 1 container in killable map
- editPolicy.editSchedule();
- Assert.assertEquals(1, editPolicy.getToPreemptContainers().size());
-
- // Call editPolicy.editSchedule() once more, we should have 2 containers killable map
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
-
- // Check if previous killable containers included by new killable containers
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- Assert.assertTrue(
- Sets.difference(previousKillableContainers, killableContainers.keySet())
- .isEmpty());
- }
-
- /*
- * Ignore this test now because it could be a premature optimization
- */
- @Ignore
- @Test (timeout = 60000)
- public void testPreemptionPolicyCleanupKillableContainersWhenNoPreemptionNeeded()
- throws Exception {
- /**
- * Test case:
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- * Submit applications to two queues, one uses more than the other, so
- * preemption will happen.
- *
- * Check:
- * 1) Containers will be marked to killable
- * 2) Cancel resource request
- * 3) Killable containers will be cancelled from policy and scheduler
- */
- MockRM rm1 = new MockRM(conf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 6 times for node1
- for (int i = 0; i < 6; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
-
- // NM1 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- am2.allocate("*", 3 * GB, 1, new ArrayList<ContainerId>());
-
- // Get edit policy and do one update
- ProportionalCapacityPreemptionPolicy editPolicy =
- (ProportionalCapacityPreemptionPolicy) getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if 3 container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- PreemptionManager pm = cs.getPreemptionManager();
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 3);
-
- // Change reqeust from 3G to 2G, now we can preempt one less container. (3->2)
- am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
-
- // Call editSchedule once more to make sure still nothing happens
- editPolicy.editSchedule();
- Assert.assertEquals(0, editPolicy.getToPreemptContainers().size());
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 2);
- }
-
- @Test (timeout = 60000)
- public void testPreemptionConsidersUserLimit()
- throws Exception {
- /**
- * Test case: Submit two application (app1/app2) to different queues, queue
- * structure:
- *
- * <pre>
- * Root
- * / | \
- * a b c
- * 10 20 70
- * </pre>
- *
- * Queue-c's user-limit-factor = 0.1, so single user cannot allocate >1 containers in queue-c
- *
- * 1) Two nodes in the cluster, each of them has 4G.
- *
- * 2) app1 submit to queue-a first, it asked 7 * 1G containers, so there's no
- * more resource available.
- *
- * 3) app2 submit to queue-c, ask for one 1G container (for AM)
- *
- * Now the cluster is fulfilled.
- *
- * 4) app2 asks for another 1G container, system will preempt one container
- * from app1, and app2 will receive the preempted container
- */
- CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(conf);
- csConf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".c", 0.1f);
- MockRM rm1 = new MockRM(csConf);
- rm1.getRMContext().setNodeLabelManager(mgr);
- rm1.start();
- MockNM nm1 = rm1.registerNode("h1:1234", 4 * GB);
- MockNM nm2 = rm1.registerNode("h2:1234", 4 * GB);
- CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
- RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
- RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
-
- // launch an app to queue, AM container should be launched in nm1
- RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
- MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
-
- am1.allocate("*", 1 * GB, 6, new ArrayList<ContainerId>());
-
- // Do allocation 3 times for node1/node2
- for (int i = 0; i < 3; i++) {
- cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
- }
-
- // App1 should have 7 containers now, and no available resource for cluster
- FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
- am1.getApplicationAttemptId());
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
-
- // Submit app2 to queue-c and asks for a 1G container for AM
- RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c");
- MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm2);
-
- // NM1/NM2 has available resource = 0G
- Assert.assertEquals(0 * GB, cs.getNode(nm1.getNodeId())
- .getAvailableResource().getMemory());
- Assert.assertEquals(0 * GB, cs.getNode(nm2.getNodeId())
- .getAvailableResource().getMemory());
-
- // AM asks for a 1 * GB container
- am2.allocate(Arrays.asList(ResourceRequest
- .newInstance(Priority.newInstance(1), ResourceRequest.ANY,
- Resources.createResource(1 * GB), 1)), null);
-
- // Get edit policy and do one update
- SchedulingEditPolicy editPolicy = getSchedulingEditPolicy(rm1);
-
- // Call edit schedule twice, and check if no container from app1 marked
- // to be "killable"
- editPolicy.editSchedule();
- editPolicy.editSchedule();
-
- // No preemption happens
- PreemptionManager pm = cs.getPreemptionManager();
- Map<ContainerId, RMContainer> killableContainers =
- waitKillableContainersSize(pm, "a", RMNodeLabelsManager.NO_LABEL, 0);
- Assert.assertEquals(0, killableContainers.size());
-
- // Call CS.handle once to see if container preempted
- cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
-
- FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
- am2.getApplicationAttemptId());
-
- // App1 has 7 containers, and app2 has 1 containers (nothing preempted)
- Assert.assertEquals(7, schedulerApp1.getLiveContainers().size());
- Assert.assertEquals(1, schedulerApp2.getLiveContainers().size());
-
- rm1.close();
- }
-
- private Map<ContainerId, RMContainer> waitKillableContainersSize(
- PreemptionManager pm, String queueName, String partition,
- int expectedSize) throws InterruptedException {
- Map<ContainerId, RMContainer> killableContainers =
- pm.getKillableContainersMap(queueName, partition);
-
- int wait = 0;
- // Wait for at most 5 sec (it should be super fast actually)
- while (expectedSize != killableContainers.size() && wait < 500) {
- killableContainers = pm.getKillableContainersMap(queueName, partition);
- Thread.sleep(10);
- wait++;
- }
-
- Assert.assertEquals(expectedSize, killableContainers.size());
- return killableContainers;
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org