You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/04/25 02:04:05 UTC
[1/2] hadoop git commit: YARN-2498. Respect labels in preemption
policy of capacity scheduler for inter-queue preemption. Contributed by
Wangda Tan (cherry picked from commit
d497f6ea2be559aa31ed76f37ae949dbfabe2a51)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 932cff610 -> 9bf09b334
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
new file mode 100644
index 0000000..e13320c
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyForNodePartitions.java
@@ -0,0 +1,1211 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.MONITORING_INTERVAL;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.NATURAL_TERMINATION_FACTOR;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.TOTAL_PREEMPTION_PER_ROUND;
+import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.event.EventHandler;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+public class TestProportionalCapacityPreemptionPolicyForNodePartitions {
+ private static final Log LOG =
+ LogFactory.getLog(TestProportionalCapacityPreemptionPolicyForNodePartitions.class);
+ static final String ROOT = CapacitySchedulerConfiguration.ROOT;
+
+ private Map<String, CSQueue> nameToCSQueues = null;
+ private Map<String, Resource> partitionToResource = null;
+ private Map<NodeId, SchedulerNode> nodeIdToSchedulerNodes = null;
+ private RMNodeLabelsManager nlm = null;
+ private RMContext rmContext = null;
+
+ private ResourceCalculator rc = new DefaultResourceCalculator();
+ private Clock mClock = null;
+ private Configuration conf = null;
+ private CapacitySchedulerConfiguration csConf = null;
+ private CapacityScheduler cs = null;
+ private EventHandler<ContainerPreemptEvent> mDisp = null;
+ private ProportionalCapacityPreemptionPolicy policy = null;
+ private Resource clusterResource = null;
+
+ @SuppressWarnings("unchecked")
+ @Before
+ public void setup() {
+ org.apache.log4j.Logger.getRootLogger().setLevel(
+ org.apache.log4j.Level.DEBUG);
+
+ conf = new Configuration(false);
+ conf.setLong(WAIT_TIME_BEFORE_KILL, 10000);
+ conf.setLong(MONITORING_INTERVAL, 3000);
+ // report "ideal" preempt
+ conf.setFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 1.0);
+ conf.setFloat(NATURAL_TERMINATION_FACTOR, (float) 1.0);
+ conf.set(YarnConfiguration.RM_SCHEDULER_MONITOR_POLICIES,
+ ProportionalCapacityPreemptionPolicy.class.getCanonicalName());
+ conf.setBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
+ // FairScheduler doesn't support this test,
+ // Set CapacityScheduler as the scheduler for this test.
+ conf.set("yarn.resourcemanager.scheduler.class",
+ CapacityScheduler.class.getName());
+
+ mClock = mock(Clock.class);
+ cs = mock(CapacityScheduler.class);
+ when(cs.getResourceCalculator()).thenReturn(rc);
+
+ nlm = mock(RMNodeLabelsManager.class);
+ mDisp = mock(EventHandler.class);
+
+ rmContext = mock(RMContext.class);
+ when(rmContext.getNodeLabelManager()).thenReturn(nlm);
+ csConf = new CapacitySchedulerConfiguration();
+ when(cs.getConfiguration()).thenReturn(csConf);
+ when(cs.getRMContext()).thenReturn(rmContext);
+
+ policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ partitionToResource = new HashMap<>();
+ nodeIdToSchedulerNodes = new HashMap<>();
+ nameToCSQueues = new HashMap<>();
+ }
+
+ @Test
+ public void testBuilder() throws Exception {
+ /**
+ * Test of test, make sure we build expected mock schedulable objects
+ */
+ String labelsConfig =
+ "=200,true;" + // default partition
+ "red=100,false;" + // partition=red
+ "blue=200,true"; // partition=blue
+ String nodesConfig =
+ "n1=red;" + // n1 has partition=red
+ "n2=blue;" + // n2 has partition=blue
+ "n3="; // n3 doesn't have partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[200 200 100 100],red=[100 100 100 100],blue=[200 200 200 200]);" + //root
+ "-a(=[100 200 100 100],red=[0 0 0 0],blue=[200 200 200 200]);" + // a
+ "--a1(=[50 100 50 100],red=[0 0 0 0],blue=[100 200 200 0]);" + // a1
+ "--a2(=[50 200 50 0],red=[0 0 0 0],blue=[100 200 0 200]);" + // a2
+ "-b(=[100 200 0 0],red=[100 100 100 100],blue=[0 0 0 0])";
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ // app1 in a1, , 50 in n2 (reserved), 50 in n2 (allocated)
+ "a1\t" // app1 in a1
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "a1\t" // app2 in a1
+ + "(2,1,n2,,50,true)(2,1,n2,,50,false)" // 50 * ignore-exclusivity (reserved),
+ // 50 * ignore-exclusivity (allocated)
+ + "(2,1,n2,blue,50,true)(2,1,n2,blue,50,true);" + // 50 in n2 (reserved),
+ // 50 in n2 (allocated)
+ "a2\t" // app3 in a2
+ + "(1,1,n3,red,50,false);" + // 50 * default in n3
+
+ "b\t" // app4 in b
+ + "(1,1,n1,red,100,false);";
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+
+ // Check queues:
+ // root
+ checkAbsCapacities(cs.getQueue("root"), "", 1f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("root"), "", 100);
+ checkAbsCapacities(cs.getQueue("root"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "red", 100);
+ checkAbsCapacities(cs.getQueue("root"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("root"), "blue", 200);
+
+ // a
+ checkAbsCapacities(cs.getQueue("a"), "", 0.5f, 1f, 0.5f);
+ checkPendingResource(cs.getQueue("a"), "", 100);
+ checkAbsCapacities(cs.getQueue("a"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a"), "blue", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a"), "blue", 200);
+
+ // a1
+ checkAbsCapacities(cs.getQueue("a1"), "", 0.25f, 0.5f, 0.25f);
+ checkPendingResource(cs.getQueue("a1"), "", 100);
+ checkAbsCapacities(cs.getQueue("a1"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a1"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a1"), "blue", 0.5f, 1f, 1f);
+ checkPendingResource(cs.getQueue("a1"), "blue", 0);
+
+ // a2
+ checkAbsCapacities(cs.getQueue("a2"), "", 0.25f, 1f, 0.25f);
+ checkPendingResource(cs.getQueue("a2"), "", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "red", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "red", 0);
+ checkAbsCapacities(cs.getQueue("a2"), "blue", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("a2"), "blue", 200);
+
+ // b1
+ checkAbsCapacities(cs.getQueue("b"), "", 0.5f, 1f, 0f);
+ checkPendingResource(cs.getQueue("b"), "", 0);
+ checkAbsCapacities(cs.getQueue("b"), "red", 1f, 1f, 1f);
+ checkPendingResource(cs.getQueue("b"), "red", 100);
+ checkAbsCapacities(cs.getQueue("b"), "blue", 0f, 0f, 0f);
+ checkPendingResource(cs.getQueue("b"), "blue", 0);
+
+ // Check ignored partitioned containers in queue
+ Assert.assertEquals(100, ((LeafQueue) cs.getQueue("a1"))
+ .getIgnoreExclusivityRMContainers().get("blue").size());
+
+ // Check applications
+ Assert.assertEquals(2, ((LeafQueue)cs.getQueue("a1")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("a2")).getApplications().size());
+ Assert.assertEquals(1, ((LeafQueue)cs.getQueue("b")).getApplications().size());
+
+ // Check #containers
+ FiCaSchedulerApp app1 = getApp("a1", 1);
+ FiCaSchedulerApp app2 = getApp("a1", 2);
+ FiCaSchedulerApp app3 = getApp("a2", 3);
+ FiCaSchedulerApp app4 = getApp("b", 4);
+
+ Assert.assertEquals(50, app1.getLiveContainers().size());
+ checkContainerNodesInApp(app1, 50, "n3");
+
+ Assert.assertEquals(50, app2.getLiveContainers().size());
+ Assert.assertEquals(150, app2.getReservedContainers().size());
+ checkContainerNodesInApp(app2, 200, "n2");
+
+ Assert.assertEquals(50, app3.getLiveContainers().size());
+ checkContainerNodesInApp(app3, 50, "n3");
+
+ Assert.assertEquals(100, app4.getLiveContainers().size());
+ checkContainerNodesInApp(app4, 100, "n1");
+ }
+
+ @Test
+ public void testNodePartitionPreemptionRespectGuaranteedCapacity()
+ throws IOException {
+ /**
+ * The simplest test of node label, Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+ * app1/app2 in a, and app3/app4 in b.
+ * app1 uses 80 x, app2 uses 20 NO_LABEL, app3 uses 20 x, app4 uses 80 NO_LABEL.
+ * Both a/b have 50 pending resource for x and NO_LABEL
+ *
+ * After preemption, it should preempt 30 from app1, and 30 from app4.
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[50 100 20 50],x=[50 100 80 50]);" + // a
+ "-b(=[50 100 80 50],x=[50 100 20 50])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,80,false);" + // 80 * x in n1
+ "a\t" // app2 in a
+ + "(1,1,n2,,20,false);" + // 20 default in n2
+ "b\t" // app3 in b
+ + "(1,1,n1,x,20,false);" + // 80 * x in n1
+ "b\t" // app4 in b
+ + "(1,1,n2,,80,false)"; // 20 default in n2
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+ // from app2/app3
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testNodePartitionPreemptionRespectMaximumCapacity()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | \
+ * a b c
+ * </pre>
+ *
+ * Both a/b/c can access x, and guaranteed_capacity(x) of them is 80:10:10.
+ * a/b's max resource is 100, and c's max resource is 30.
+ *
+ * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+ *
+ * 2 apps in cluster.
+ * app1 in b and app2 in c.
+ *
+ * app1 uses 90x, and app2 use 10x. After preemption, app2 will preempt 10x
+ * from app1 because of max capacity.
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[80 80 0 0],x=[80 80 0 0]);" + // a
+ "-b(=[10 100 0 0],x=[10 100 90 50]);" + // b
+ "-c(=[10 100 0 0],x=[10 30 10 50])"; //c
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "b\t" // app1 in b
+ + "(1,1,n1,x,90,false);" + // 80 * x in n1
+ "c\t" // app2 in c
+ + "(1,1,n1,x,10,false)"; // 20 default in n2
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+ // from app2/app3
+ verify(mDisp, times(20)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testNodePartitionPreemptionOfIgnoreExclusivityAndRespectCapacity()
+ throws IOException {
+ /**
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster,
+ * app1/app2 in a
+ * app1 uses 20x (ignoreExclusivity), app2 uses 80x (respectExclusivity).
+ *
+ * b has 100 pending resource of x
+ *
+ * After preemption, it should preempt 20 from app1, and 30 from app2.
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,false"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[50 100 0 0],x=[50 100 100 50]);" + // a
+ "-b(=[50 100 0 0],x=[50 100 0 100])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,1,false)" // 1 * x in n1 (it's AM container)
+ + "(1,1,n1,,20,false);" + // 20 * x in n1 (ignoreExclusivity)
+ "a\t" // app2 in a
+ + "(1,1,n1,x,79,false)"; // 79 * x
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 30 preempted from app1, 30 preempted from app4, and nothing preempted
+ // from app2/app3
+ verify(mDisp, times(20)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ }
+
+ @Test
+ public void testNodePartitionPreemptionOfSkippingAMContainer()
+ throws IOException {
+ /**
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Both a/b can access x, and guaranteed capacity of them is 20:80. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL and 2 applications in the cluster,
+ * app1/app2/app3/app4/app5 in a, both uses 20 resources.
+ *
+ * b has 100 pending resource of x
+ *
+ * After preemption, it should preempt 19 from app[5-2] an 4 from app1
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[50 100 0 0],x=[20 100 100 50]);" + // a
+ "-b(=[50 100 0 0],x=[80 100 0 100])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app2 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app3 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app4 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app5 in a
+ + "(1,1,n1,x,20,false);"; // uses 20 resource
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 4 from app1
+ verify(mDisp, times(4)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ // 19 from app2-app5
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
+ }
+
+ @Test
+ public void testNodePartitionPreemptionOfAMContainer()
+ throws IOException {
+ /**
+ * <pre>
+ * root
+ * / \
+ * a b
+ * </pre>
+ *
+ * Both a/b can access x, and guaranteed capacity of them is 3:97. Two
+ * nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+ *
+ * app1/app2/app3/app4/app5 in a, both uses 20 resources(x)
+ *
+ * b has 100 pending resource of x
+ *
+ * After preemption, it should preempt 20 from app4/app5 an 19 from
+ * app1-app3. App4/app5's AM container will be preempted
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[50 100 0 0],x=[3 100 100 50]);" + // a
+ "-b(=[50 100 0 0],x=[97 100 0 100])"; // b
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app2 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app3 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app4 in a
+ + "(1,1,n1,x,20,false);" + // uses 20 resource
+ "a\t" // app5 in a
+ + "(1,1,n1,x,20,false);"; // uses 20 resource
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 4 from app1
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ // 19 from app2-app5
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, times(19)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ verify(mDisp, times(20)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+ verify(mDisp, times(20)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(5))));
+ }
+
+ @Test
+ public void testNodePartitionDisablePreemptionForSingleLevelQueue()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / | \
+ * a b c
+ * </pre>
+ *
+ * Both a/b/c can access x, and guaranteed_capacity(x) of them is 40:20:40.
+ * a/b/c's max resource is 100. b is disable-preemption
+ *
+ * Two nodes, n1 has 100 x, n2 has 100 NO_LABEL.
+ *
+ * 2 apps in cluster. app1 in a (usage=50), app2 in b(usage=30), app3 in
+ * c(usage=20). All of them have 50 pending resource.
+ *
+ * After preemption, app1 will be preempt 10 containers and app2 will not be
+ * preempted
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[80 80 0 0],x=[40 100 50 50]);" + // a
+ "-b(=[10 100 0 0],x=[20 100 30 0]);" + // b
+ "-c(=[10 100 0 0],x=[40 100 20 50])"; //c
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a\t" // app1 in a
+ + "(1,1,n1,x,50,false);" + // 50x in n1
+ "b\t" // app2 in b
+ + "(1,1,n1,x,30,false);" + // 30x in n1
+ "c\t" // app3 in c
+ + "(1,1,n1,x,20,false)"; // 20x in n1
+
+ csConf.setPreemptionDisabled("root.b", true);
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 10 preempted from app1, nothing preempted from app2-app3
+ verify(mDisp, times(10)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testNodePartitionNonAccessibleQueuesSharePartitionedResource()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * _________
+ * / | | \
+ * a b c d
+ * </pre>
+ *
+ * a/b can access x, their capacity is 50:50 c/d cannot access x.
+ *
+ * a uses 0, wants 30
+ * b(app1) uses 30, wants 0
+ * c(app2)&d(app3) use 35, wants 50
+ *
+ * After preemption, c/d will be preempted 15 containers, because idle
+ * resource = 100 - 30 (which is used by b) - 30 (which is asked by a) = 40
+ * will be divided by c/d, so each of c/d get 20.
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,false"; // partition=x
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2="; // n2 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 100 100],x=[100 100 100 100]);" + //root
+ "-a(=[25 100 0 0],x=[50 100 0 30]);" + // a
+ "-b(=[25 100 0 0],x=[50 100 30 0]);" + // b
+ "-c(=[25 100 1 0],x=[0 0 35 50]);" + //c
+ "-d(=[25 100 1 0],x=[0 0 35 50])"; //d
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "b\t" // app1 in b
+ + "(1,1,n1,x,30,false);" + // 50x in n1
+ "c\t" // app2 in c
+ + "(1,1,n2,,1,false)" // AM container (in n2)
+ + "(1,1,n1,,30,false);" + // 30x in n1 (ignore exclusivity)
+ "d\t" // app3 in d
+ + "(1,1,n2,,1,false)" // AM container (in n2)
+ + "(1,1,n1,,30,false)"; // 30x in n1 (ignore exclusivity)
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 15 will be preempted app2/app3
+ verify(mDisp, times(15)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, times(15)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ }
+
+ @Test
+ public void testHierarchyPreemptionForMultiplePartitions()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * / \ / \
+ * a1 a2 b1 b2
+ * </pre>
+ *
+ * Both a/b can access x/y, and in all hierarchy capacity ratio is 50:50.
+ * So for a1/a2/b1/b2, all of them can access 25x, 25y
+ *
+ * a1 uses 35x, 25y
+ * a2 uses 25x, 15y
+ * b1 uses 15x, 25y
+ * b2 uses 25x 35y
+ *
+ * So as a result, a2 will preempt from b2, and b1 will preempt from a1.
+ *
+ * After preemption, a1 will be preempted 10x and b2 will be preempted 10y.
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true;" + // partition=x
+ "y=100,true"; // partition=y
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2=y;" + // n2 has partition=y
+ "n3="; // n3 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root
+ "-a(=[50 100 0 0],x=[50 100 60 40],y=[50 100 40 40]);" + // a
+ "--a1(=[25 100 0 0],x=[25 100 35 20],y=[25 100 25 20]);" + // a1
+ "--a2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 15 20]);" + // a2
+ "-b(=[50 100 0 0],x=[50 100 40 40],y=[50 100 60 40]);" + // b
+ "--b1(=[25 100 0 0],x=[25 100 15 20],y=[25 100 25 20]);" + // b1
+ "--b2(=[25 100 0 0],x=[25 100 25 20],y=[25 100 35 20])"; // b2
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a1\t" // app1 in a1
+ + "(1,1,n1,x,35,false)" // 35 of x
+ + "(1,1,n2,y,25,false);" + // 25 of y
+ "a2\t" // app2 in a2
+ + "(1,1,n1,x,25,false)" // 25 of x
+ + "(1,1,n2,y,15,false);" + // 15 of y
+ "b1\t" // app3 in b1
+ + "(1,1,n1,x,15,false)" // 15 of x
+ + "(1,1,n2,y,25,false);" + // 25 of y
+ "b2\t" // app4 in b2
+ + "(1,1,n1,x,25,false)" // 25 of x
+ + "(1,1,n2,y,35,false)"; // 35 of y
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ // 10 will be preempted from app1 (a1) /app4 (b2)
+ verify(mDisp, times(10)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(10)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(4))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ verify(mDisp, never()).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+ }
+
+ @Test
+ public void testHierarchyPreemptionForDifferenceAcessibility()
+ throws IOException {
+ /**
+ * Queue structure is:
+ *
+ * <pre>
+ * root
+ * / \
+ * a b
+ * / \ / \
+ * a1 a2 b1 b2
+ * </pre>
+ *
+ * a can access x only and b can access y only
+ *
+ * Capacities of a1/a2, b1/b2 is 50:50
+ *
+ * a1 uses 100x and b1 uses 80y
+ *
+ * So as a result, a1 will be preempted 50 containers and b1 will be
+ * preempted 30 containers
+ */
+ String labelsConfig =
+ "=100,true;" + // default partition
+ "x=100,true;" + // partition=x
+ "y=100,true"; // partition=y
+ String nodesConfig =
+ "n1=x;" + // n1 has partition=x
+ "n2=y;" + // n2 has partition=y
+ "n3="; // n3 is default partition
+ String queuesConfig =
+ // guaranteed,max,used,pending
+ "root(=[100 100 0 0],x=[100 100 100 100],y=[100 100 100 100]);" + //root
+ "-a(=[50 100 0 0],x=[100 100 100 100]);" + // a
+ "--a1(=[25 100 0 0],x=[50 100 100 0]);" + // a1
+ "--a2(=[25 100 0 0],x=[50 100 0 100]);" + // a2
+ "-b(=[50 100 0 0],y=[100 100 80 100]);" + // b
+ "--b1(=[25 100 0 0],y=[50 100 80 0]);" + // b1
+ "--b2(=[25 100 0 0],y=[50 100 0 100])"; // b2
+ String appsConfig=
+ //queueName\t(priority,resource,host,expression,#repeat,reserved)
+ "a1\t" // app1 in a1
+ + "(1,1,n1,x,100,false);" + // 100 of x
+ "b1\t" // app2 in b1
+ + "(1,1,n2,y,80,false)"; // 80 of y
+
+ buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+ policy.editSchedule();
+
+ verify(mDisp, times(50)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+ verify(mDisp, times(30)).handle(
+ argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+ }
+
+
+ private ApplicationAttemptId getAppAttemptId(int id) {
+ ApplicationId appId = ApplicationId.newInstance(0L, id);
+ ApplicationAttemptId appAttemptId =
+ ApplicationAttemptId.newInstance(appId, 1);
+ return appAttemptId;
+ }
+
+ private void checkContainerNodesInApp(FiCaSchedulerApp app,
+ int expectedContainersNumber, String host) {
+ NodeId nodeId = NodeId.newInstance(host, 1);
+ int num = 0;
+ for (RMContainer c : app.getLiveContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ for (RMContainer c : app.getReservedContainers()) {
+ if (c.getAllocatedNode().equals(nodeId)) {
+ num++;
+ }
+ }
+ Assert.assertEquals(expectedContainersNumber, num);
+ }
+
+ private FiCaSchedulerApp getApp(String queueName, int appId) {
+ for (FiCaSchedulerApp app : ((LeafQueue) cs.getQueue(queueName))
+ .getApplications()) {
+ if (app.getApplicationId().getId() == appId) {
+ return app;
+ }
+ }
+ return null;
+ }
+
+ private void checkAbsCapacities(CSQueue queue, String partition,
+ float guaranteed, float max, float used) {
+ QueueCapacities qc = queue.getQueueCapacities();
+ Assert.assertEquals(guaranteed, qc.getAbsoluteCapacity(partition), 1e-3);
+ Assert.assertEquals(max, qc.getAbsoluteMaximumCapacity(partition), 1e-3);
+ Assert.assertEquals(used, qc.getAbsoluteUsedCapacity(partition), 1e-3);
+ }
+
+ private void checkPendingResource(CSQueue queue, String partition, int pending) {
+ ResourceUsage ru = queue.getQueueResourceUsage();
+ Assert.assertEquals(pending, ru.getPending(partition).getMemory());
+ }
+
+ private void buildEnv(String labelsConfig, String nodesConfig,
+ String queuesConfig, String appsConfig) throws IOException {
+ mockNodeLabelsManager(labelsConfig);
+ mockSchedulerNodes(nodesConfig);
+ for (NodeId nodeId : nodeIdToSchedulerNodes.keySet()) {
+ when(cs.getSchedulerNode(nodeId)).thenReturn(
+ nodeIdToSchedulerNodes.get(nodeId));
+ }
+ ParentQueue root = mockQueueHierarchy(queuesConfig);
+ when(cs.getRootQueue()).thenReturn(root);
+ when(cs.getClusterResource()).thenReturn(clusterResource);
+ mockApplications(appsConfig);
+
+ policy = new ProportionalCapacityPreemptionPolicy(conf, mDisp, cs, mClock);
+ }
+
+ private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
+ String queueName, List<RMContainer> reservedContainers,
+ List<RMContainer> liveContainers) {
+ int containerId = 1;
+ int start = containersConfig.indexOf("=") + 1;
+ int end = -1;
+
+ while (start < containersConfig.length()) {
+ while (start < containersConfig.length()
+ && containersConfig.charAt(start) != '(') {
+ start++;
+ }
+ if (start >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error containers specification, line=" + containersConfig);
+ }
+ end = start + 1;
+ while (end < containersConfig.length()
+ && containersConfig.charAt(end) != ')') {
+ end++;
+ }
+ if (end >= containersConfig.length()) {
+ throw new IllegalArgumentException(
+ "Error containers specification, line=" + containersConfig);
+ }
+
+ // now we found start/end, get container values
+ String[] values = containersConfig.substring(start + 1, end).split(",");
+ if (values.length != 6) {
+ throw new IllegalArgumentException("Format to define container is:"
+ + "(priority,resource,host,expression,repeat,reserved)");
+ }
+ Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+ Resource res = Resources.createResource(Integer.valueOf(values[1]));
+ NodeId host = NodeId.newInstance(values[2], 1);
+ String exp = values[3];
+ int repeat = Integer.valueOf(values[4]);
+ boolean reserved = Boolean.valueOf(values[5]);
+
+ for (int i = 0; i < repeat; i++) {
+ Container c = mock(Container.class);
+ when(c.getResource()).thenReturn(res);
+ when(c.getPriority()).thenReturn(pri);
+ RMContainerImpl rmc = mock(RMContainerImpl.class);
+ when(rmc.getAllocatedNode()).thenReturn(host);
+ when(rmc.getNodeLabelExpression()).thenReturn(exp);
+ when(rmc.getAllocatedResource()).thenReturn(res);
+ when(rmc.getContainer()).thenReturn(c);
+ when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
+ final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
+ when(rmc.getContainerId()).thenReturn(
+ cId);
+ doAnswer(new Answer<Integer>() {
+ @Override
+ public Integer answer(InvocationOnMock invocation) throws Throwable {
+ return cId.compareTo(((RMContainer) invocation.getArguments()[0])
+ .getContainerId());
+ }
+ }).when(rmc).compareTo(any(RMContainer.class));
+
+ if (containerId == 1) {
+ when(rmc.isAMContainer()).thenReturn(true);
+ }
+
+ if (reserved) {
+ reservedContainers.add(rmc);
+ } else {
+ liveContainers.add(rmc);
+ }
+
+ // If this is a non-exclusive allocation
+ String partition = null;
+ if (exp.isEmpty()
+ && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
+ .isEmpty()) {
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
+ queue.getIgnoreExclusivityRMContainers();
+ if (!ignoreExclusivityContainers.containsKey(partition)) {
+ ignoreExclusivityContainers.put(partition,
+ new TreeSet<RMContainer>());
+ }
+ ignoreExclusivityContainers.get(partition).add(rmc);
+ }
+ LOG.debug("add container to app=" + attemptId + " res=" + res
+ + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+ + partition);
+
+ containerId++;
+ }
+
+ start = end + 1;
+ }
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * queueName\t // app1
+ * (priority,resource,host,expression,#repeat,reserved)
+ * (priority,resource,host,expression,#repeat,reserved);
+ * queueName\t // app2
+ * </pre>
+ */
+ private void mockApplications(String appsConfig) {
+ int id = 1;
+ for (String a : appsConfig.split(";")) {
+ String[] strs = a.split("\t");
+ String queueName = strs[0];
+
+ // get containers
+ List<RMContainer> liveContainers = new ArrayList<RMContainer>();
+ List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
+ ApplicationId appId = ApplicationId.newInstance(0L, id);
+ ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+
+ mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+ liveContainers);
+
+ FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+ when(app.getLiveContainers()).thenReturn(liveContainers);
+ when(app.getReservedContainers()).thenReturn(reservedContainers);
+ when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
+ when(app.getApplicationId()).thenReturn(appId);
+
+ // add to LeafQueue
+ LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+ queue.getApplications().add(app);
+
+ id++;
+ }
+ }
+
+ /**
+ * Format is:
+ * host1=partition;
+ * host2=partition;
+ */
+ private void mockSchedulerNodes(String schedulerNodesConfigStr)
+ throws IOException {
+ String[] nodesConfigStrArray = schedulerNodesConfigStr.split(";");
+ for (String p : nodesConfigStrArray) {
+ NodeId nodeId = NodeId.newInstance(p.substring(0, p.indexOf("=")), 1);
+ String partition = p.substring(p.indexOf("=") + 1, p.length());
+
+ SchedulerNode sn = mock(SchedulerNode.class);
+ when(sn.getNodeID()).thenReturn(nodeId);
+ when(sn.getPartition()).thenReturn(partition);
+ nodeIdToSchedulerNodes.put(nodeId, sn);
+
+ LOG.debug("add scheduler node, id=" + nodeId + ", partition=" + partition);
+ }
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * partition0=total_resource,exclusivity;
+ * partition1=total_resource,exclusivity;
+ * ...
+ * </pre>
+ */
+ private void mockNodeLabelsManager(String nodeLabelsConfigStr) throws IOException {
+ String[] partitionConfigArr = nodeLabelsConfigStr.split(";");
+ clusterResource = Resources.createResource(0);
+ for (String p : partitionConfigArr) {
+ String partitionName = p.substring(0, p.indexOf("="));
+ int totalResource =
+ Integer.valueOf(p.substring(p.indexOf("=") + 1, p.indexOf(",")));
+ boolean exclusivity =
+ Boolean.valueOf(p.substring(p.indexOf(",") + 1, p.length()));
+ Resource res = Resources.createResource(totalResource);
+ when(nlm.getResourceByLabel(eq(partitionName), any(Resource.class)))
+ .thenReturn(res);
+ when(nlm.isExclusiveNodeLabel(eq(partitionName))).thenReturn(exclusivity);
+
+ // add to partition to resource
+ partitionToResource.put(partitionName, res);
+ LOG.debug("add partition=" + partitionName + " totalRes=" + res
+ + " exclusivity=" + exclusivity);
+ Resources.addTo(clusterResource, res);
+ }
+
+ when(nlm.getClusterNodeLabelNames()).thenReturn(
+ partitionToResource.keySet());
+ }
+
+ /**
+ * Format is:
+ * <pre>
+ * root (<partition-name-1>=[guaranteed max used pending],<partition-name-2>=..);
+ * -A(...);
+ * --A1(...);
+ * --A2(...);
+ * -B...
+ * </pre>
+ * ";" splits queues, and there should no empty lines, no extra spaces
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ private ParentQueue mockQueueHierarchy(String queueExprs) {
+ String[] queueExprArray = queueExprs.split(";");
+ ParentQueue rootQueue = null;
+ for (int idx = 0; idx < queueExprArray.length; idx++) {
+ String q = queueExprArray[idx];
+ CSQueue queue;
+
+ // Initialize queue
+ if (isParent(queueExprArray, idx)) {
+ ParentQueue parentQueue = mock(ParentQueue.class);
+ queue = parentQueue;
+ List<CSQueue> children = new ArrayList<CSQueue>();
+ when(parentQueue.getChildQueues()).thenReturn(children);
+ } else {
+ LeafQueue leafQueue = mock(LeafQueue.class);
+ final TreeSet<FiCaSchedulerApp> apps =
+ new TreeSet<>(CapacityScheduler.applicationComparator);
+ when(leafQueue.getApplications()).thenReturn(apps);
+ OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
+ when(so.getPreemptionIterator()).thenAnswer(new Answer() {
+ public Object answer(InvocationOnMock invocation) {
+ return apps.descendingIterator();
+ }
+ });
+ when(leafQueue.getOrderingPolicy()).thenReturn(so);
+
+ Map<String, TreeSet<RMContainer>> ignorePartitionContainers =
+ new HashMap<>();
+ when(leafQueue.getIgnoreExclusivityRMContainers()).thenReturn(
+ ignorePartitionContainers);
+ queue = leafQueue;
+ }
+
+ setupQueue(queue, q, queueExprArray, idx);
+ if (queue.getQueueName().equals(ROOT)) {
+ rootQueue = (ParentQueue) queue;
+ }
+ }
+ return rootQueue;
+ }
+
+ private void setupQueue(CSQueue queue, String q, String[] queueExprArray,
+ int idx) {
+ LOG.debug("*** Setup queue, source=" + q);
+ String queuePath = null;
+
+ int myLevel = getLevel(q);
+ if (0 == myLevel) {
+ // It's root
+ when(queue.getQueueName()).thenReturn(ROOT);
+ queuePath = ROOT;
+ }
+
+ String queueName = getQueueName(q);
+ when(queue.getQueueName()).thenReturn(queueName);
+
+ // Setup parent queue, and add myself to parentQueue.children-list
+ ParentQueue parentQueue = getParentQueue(queueExprArray, idx, myLevel);
+ if (null != parentQueue) {
+ when(queue.getParent()).thenReturn(parentQueue);
+ parentQueue.getChildQueues().add(queue);
+
+ // Setup my path
+ queuePath = parentQueue.getQueuePath() + "." + queueName;
+ }
+ when(queue.getQueuePath()).thenReturn(queuePath);
+
+ QueueCapacities qc = new QueueCapacities(0 == myLevel);
+ ResourceUsage ru = new ResourceUsage();
+
+ when(queue.getQueueCapacities()).thenReturn(qc);
+ when(queue.getQueueResourceUsage()).thenReturn(ru);
+
+ LOG.debug("Setup queue, name=" + queue.getQueueName() + " path="
+ + queue.getQueuePath());
+ LOG.debug("Parent=" + (parentQueue == null ? "null" : parentQueue
+ .getQueueName()));
+
+ // Setup other fields like used resource, guaranteed resource, etc.
+ String capacitySettingStr = q.substring(q.indexOf("(") + 1, q.indexOf(")"));
+ for (String s : capacitySettingStr.split(",")) {
+ String partitionName = s.substring(0, s.indexOf("="));
+ String[] values = s.substring(s.indexOf("[") + 1, s.indexOf("]")).split(" ");
+ // Add a small epsilon to capacities to avoid truncate when doing
+ // Resources.multiply
+ float epsilon = 1e-6f;
+ float absGuaranteed =
+ Integer.valueOf(values[0].trim())
+ / (float) (partitionToResource.get(partitionName).getMemory())
+ + epsilon;
+ float absMax =
+ Integer.valueOf(values[1].trim())
+ / (float) (partitionToResource.get(partitionName).getMemory())
+ + epsilon;
+ float absUsed =
+ Integer.valueOf(values[2].trim())
+ / (float) (partitionToResource.get(partitionName).getMemory())
+ + epsilon;
+ Resource pending = Resources.createResource(Integer.valueOf(values[3].trim()));
+ qc.setAbsoluteCapacity(partitionName, absGuaranteed);
+ qc.setAbsoluteMaximumCapacity(partitionName, absMax);
+ qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+ ru.setPending(partitionName, pending);
+ LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
+ + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
+ + ",abs_used" + absUsed + ",pending_resource=" + pending + "]");
+ }
+
+ // Setup preemption disabled
+ when(queue.getPreemptionDisabled()).thenReturn(
+ csConf.getPreemptionDisabled(queuePath, false));
+
+ nameToCSQueues.put(queueName, queue);
+ when(cs.getQueue(eq(queueName))).thenReturn(queue);
+ }
+
+ /**
+ * Level of a queue is how many "-" at beginning, root's level is 0
+ */
+ private int getLevel(String q) {
+ int level = 0; // level = how many "-" at beginning
+ while (level < q.length() && q.charAt(level) == '-') {
+ level++;
+ }
+ return level;
+ }
+
+ private String getQueueName(String q) {
+ int idx = 0;
+ // find first != '-' char
+ while (idx < q.length() && q.charAt(idx) == '-') {
+ idx++;
+ }
+ if (idx == q.length()) {
+ throw new IllegalArgumentException("illegal input:" + q);
+ }
+ // name = after '-' and before '('
+ String name = q.substring(idx, q.indexOf('('));
+ if (name.isEmpty()) {
+ throw new IllegalArgumentException("queue name shouldn't be empty:" + q);
+ }
+ if (name.contains(".")) {
+ throw new IllegalArgumentException("queue name shouldn't contain '.':"
+ + name);
+ }
+ return name;
+ }
+
+ private ParentQueue getParentQueue(String[] queueExprArray, int idx, int myLevel) {
+ idx--;
+ while (idx >= 0) {
+ int level = getLevel(queueExprArray[idx]);
+ if (level < myLevel) {
+ String parentQueuName = getQueueName(queueExprArray[idx]);
+ return (ParentQueue) nameToCSQueues.get(parentQueuName);
+ }
+ idx--;
+ }
+
+ return null;
+ }
+
+ /**
+ * Get if a queue is ParentQueue
+ */
+ private boolean isParent(String[] queues, int idx) {
+ int myLevel = getLevel(queues[idx]);
+ idx++;
+ while (idx < queues.length && getLevel(queues[idx]) == myLevel) {
+ idx++;
+ }
+ if (idx >= queues.length || getLevel(queues[idx]) < myLevel) {
+ // It's a LeafQueue
+ return false;
+ } else {
+ return true;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 2608dcb..31661da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -138,7 +138,7 @@ public class TestChildQueueOrder {
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource, null);
+ allocatedResource, null, null);
}
// Next call - nothing
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 34248a4..1c8622f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -815,9 +815,9 @@ public class TestLeafQueue {
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority),
- null);
+ null, null);
qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority),
- null);
+ null, null);
qb.setUserLimit(50);
qb.setUserLimitFactor(1);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 46aa7ec..48d6602 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -46,6 +47,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppRepor
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.junit.Assert;
import org.junit.Before;
@@ -1015,6 +1017,20 @@ public class TestNodeLabelContainerAllocation {
// app1 gets all resource in partition=x
Assert.assertEquals(10, schedulerNode1.getNumContainers());
+ // check non-exclusive containers of LeafQueue is correctly updated
+ LeafQueue leafQueue = (LeafQueue) cs.getQueue("a");
+ Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
+ "y"));
+ Assert.assertEquals(10,
+ leafQueue.getIgnoreExclusivityRMContainers().get("x").size());
+
+ // completes all containers of app1, ignoreExclusivityRMContainers should be
+ // updated as well.
+ cs.handle(new AppAttemptRemovedSchedulerEvent(
+ am1.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false));
+ Assert.assertFalse(leafQueue.getIgnoreExclusivityRMContainers().containsKey(
+ "x"));
+
rm1.close();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index bdbd168..4deaaae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -150,7 +150,7 @@ public class TestParentQueue {
} else {
FiCaSchedulerApp app1 = getMockApplication(0, "");
((LeafQueue)queue).allocateResource(clusterResource, app1,
- allocatedResource, null);
+ allocatedResource, null, null);
}
// Next call - nothing
[2/2] hadoop git commit: YARN-2498. Respect labels in preemption
policy of capacity scheduler for inter-queue preemption. Contributed by
Wangda Tan (cherry picked from commit
d497f6ea2be559aa31ed76f37ae949dbfabe2a51)
Posted by ji...@apache.org.
YARN-2498. Respect labels in preemption policy of capacity scheduler for inter-queue preemption. Contributed by Wangda Tan
(cherry picked from commit d497f6ea2be559aa31ed76f37ae949dbfabe2a51)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/9bf09b33
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/9bf09b33
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/9bf09b33
Branch: refs/heads/branch-2
Commit: 9bf09b334d90bc88e0e365774eb0cadc4eed549c
Parents: 932cff6
Author: Jian He <ji...@apache.org>
Authored: Fri Apr 24 17:03:13 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Apr 24 17:03:57 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../ProportionalCapacityPreemptionPolicy.java | 585 +++++----
.../rmcontainer/RMContainerImpl.java | 28 +-
.../scheduler/capacity/CapacityScheduler.java | 2 +-
.../scheduler/capacity/LeafQueue.java | 70 +-
.../scheduler/common/AssignmentInformation.java | 31 +-
...estProportionalCapacityPreemptionPolicy.java | 94 +-
...pacityPreemptionPolicyForNodePartitions.java | 1211 ++++++++++++++++++
.../scheduler/capacity/TestChildQueueOrder.java | 2 +-
.../scheduler/capacity/TestLeafQueue.java | 4 +-
.../TestNodeLabelContainerAllocation.java | 16 +
.../scheduler/capacity/TestParentQueue.java | 2 +-
12 files changed, 1750 insertions(+), 298 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 1f486e4..7964807 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -54,6 +54,9 @@ Release 2.8.0 - UNRELEASED
YARN-3319. Implement a FairOrderingPolicy. (Craig Welch via wangda)
+ YARN-2498. Respect labels in preemption policy of capacity scheduler for
+ inter-queue preemption. (Wangda Tan via jianhe)
+
IMPROVEMENTS
YARN-1880. Cleanup TestApplicationClientProtocolOnHA
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 2ab4197..1f47b5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -26,11 +27,10 @@ import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
-import java.util.NavigableSet;
import java.util.PriorityQueue;
import java.util.Set;
+import java.util.TreeSet;
-import org.apache.commons.collections.map.HashedMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
@@ -40,7 +40,6 @@ import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -49,7 +48,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptE
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.PreemptableResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.SystemClock;
@@ -57,6 +58,7 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.ImmutableSet;
/**
* This class implement a {@link SchedulingEditPolicy} that is designed to be
@@ -130,7 +132,9 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
private float percentageClusterPreemptionAllowed;
private double naturalTerminationFactor;
private boolean observeOnly;
- private Map<NodeId, Set<String>> labels;
+ private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
+ new HashMap<>();
+ private RMNodeLabelsManager nlm;
public ProportionalCapacityPreemptionPolicy() {
clock = new SystemClock();
@@ -170,7 +174,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
config.getFloat(TOTAL_PREEMPTION_PER_ROUND, (float) 0.1);
observeOnly = config.getBoolean(OBSERVE_ONLY, false);
rc = scheduler.getResourceCalculator();
- labels = null;
+ nlm = scheduler.getRMContext().getNodeLabelManager();
}
@VisibleForTesting
@@ -182,34 +186,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public void editSchedule() {
CSQueue root = scheduler.getRootQueue();
Resource clusterResources = Resources.clone(scheduler.getClusterResource());
- clusterResources = getNonLabeledResources(clusterResources);
- setNodeLabels(scheduler.getRMContext().getNodeLabelManager()
- .getNodeLabels());
containerBasedPreemptOrKill(root, clusterResources);
}
-
- /**
- * Setting Node Labels
- *
- * @param nodelabels
- */
- public void setNodeLabels(Map<NodeId, Set<String>> nodelabels) {
- labels = nodelabels;
- }
-
- /**
- * This method returns all non labeled resources.
- *
- * @param clusterResources
- * @return Resources
- */
- private Resource getNonLabeledResources(Resource clusterResources) {
- RMContext rmcontext = scheduler.getRMContext();
- RMNodeLabelsManager lm = rmcontext.getNodeLabelManager();
- Resource res = lm.getResourceByLabel(RMNodeLabelsManager.NO_LABEL,
- clusterResources);
- return res == null ? clusterResources : res;
- }
/**
* This method selects and tracks containers to be preempted. If a container
@@ -220,28 +198,46 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
*/
private void containerBasedPreemptOrKill(CSQueue root,
Resource clusterResources) {
+ // All partitions to look at
+ Set<String> allPartitions = new HashSet<>();
+ allPartitions.addAll(scheduler.getRMContext()
+ .getNodeLabelManager().getClusterNodeLabelNames());
+ allPartitions.add(RMNodeLabelsManager.NO_LABEL);
// extract a summary of the queues from scheduler
- TempQueue tRoot;
synchronized (scheduler) {
- tRoot = cloneQueues(root, clusterResources);
+ queueToPartitions.clear();
+
+ for (String partitionToLookAt : allPartitions) {
+ cloneQueues(root,
+ nlm.getResourceByLabel(partitionToLookAt, clusterResources),
+ partitionToLookAt);
+ }
}
- // compute the ideal distribution of resources among queues
- // updates cloned queues state accordingly
- tRoot.idealAssigned = tRoot.guaranteed;
+ // compute total preemption allowed
Resource totalPreemptionAllowed = Resources.multiply(clusterResources,
percentageClusterPreemptionAllowed);
- List<TempQueue> queues =
- recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+
+ Set<String> leafQueueNames = null;
+ for (String partition : allPartitions) {
+ TempQueuePerPartition tRoot =
+ getQueueByPartition(CapacitySchedulerConfiguration.ROOT, partition);
+ // compute the ideal distribution of resources among queues
+ // updates cloned queues state accordingly
+ tRoot.idealAssigned = tRoot.guaranteed;
+
+ leafQueueNames =
+ recursivelyComputeIdealAssignment(tRoot, totalPreemptionAllowed);
+ }
// based on ideal allocation select containers to be preempted from each
// queue and each application
Map<ApplicationAttemptId,Set<RMContainer>> toPreempt =
- getContainersToPreempt(queues, clusterResources);
+ getContainersToPreempt(leafQueueNames, clusterResources);
if (LOG.isDebugEnabled()) {
- logToCSV(queues);
+ logToCSV(new ArrayList<String>(leafQueueNames));
}
// if we are in observeOnly mode return before any action is taken
@@ -252,6 +248,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// preempt (or kill) the selected containers
for (Map.Entry<ApplicationAttemptId,Set<RMContainer>> e
: toPreempt.entrySet()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Send to scheduler: in app=" + e.getKey()
+ + " #containers-to-be-preempted=" + e.getValue().size());
+ }
for (RMContainer container : e.getValue()) {
// if we tried to preempt this for more than maxWaitTime
if (preempted.get(container) != null &&
@@ -291,23 +291,24 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param totalPreemptionAllowed maximum amount of preemption allowed
* @return a list of leaf queues updated with preemption targets
*/
- private List<TempQueue> recursivelyComputeIdealAssignment(
- TempQueue root, Resource totalPreemptionAllowed) {
- List<TempQueue> leafs = new ArrayList<TempQueue>();
+ private Set<String> recursivelyComputeIdealAssignment(
+ TempQueuePerPartition root, Resource totalPreemptionAllowed) {
+ Set<String> leafQueueNames = new HashSet<>();
if (root.getChildren() != null &&
root.getChildren().size() > 0) {
// compute ideal distribution at this level
computeIdealResourceDistribution(rc, root.getChildren(),
totalPreemptionAllowed, root.idealAssigned);
// compute recursively for lower levels and build list of leafs
- for(TempQueue t : root.getChildren()) {
- leafs.addAll(recursivelyComputeIdealAssignment(t, totalPreemptionAllowed));
+ for(TempQueuePerPartition t : root.getChildren()) {
+ leafQueueNames.addAll(recursivelyComputeIdealAssignment(t,
+ totalPreemptionAllowed));
}
} else {
// we are in a leaf nothing to do, just return yourself
- return Collections.singletonList(root);
+ return ImmutableSet.of(root.queueName);
}
- return leafs;
+ return leafQueueNames;
}
/**
@@ -324,20 +325,21 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param tot_guarant the amount of capacity assigned to this pool of queues
*/
private void computeIdealResourceDistribution(ResourceCalculator rc,
- List<TempQueue> queues, Resource totalPreemptionAllowed, Resource tot_guarant) {
+ List<TempQueuePerPartition> queues, Resource totalPreemptionAllowed,
+ Resource tot_guarant) {
// qAlloc tracks currently active queues (will decrease progressively as
// demand is met)
- List<TempQueue> qAlloc = new ArrayList<TempQueue>(queues);
+ List<TempQueuePerPartition> qAlloc = new ArrayList<TempQueuePerPartition>(queues);
// unassigned tracks how much resources are still to assign, initialized
// with the total capacity for this set of queues
Resource unassigned = Resources.clone(tot_guarant);
// group queues based on whether they have non-zero guaranteed capacity
- Set<TempQueue> nonZeroGuarQueues = new HashSet<TempQueue>();
- Set<TempQueue> zeroGuarQueues = new HashSet<TempQueue>();
+ Set<TempQueuePerPartition> nonZeroGuarQueues = new HashSet<TempQueuePerPartition>();
+ Set<TempQueuePerPartition> zeroGuarQueues = new HashSet<TempQueuePerPartition>();
- for (TempQueue q : qAlloc) {
+ for (TempQueuePerPartition q : qAlloc) {
if (Resources
.greaterThan(rc, tot_guarant, q.guaranteed, Resources.none())) {
nonZeroGuarQueues.add(q);
@@ -361,7 +363,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// based on ideal assignment computed above and current assignment we derive
// how much preemption is required overall
Resource totPreemptionNeeded = Resource.newInstance(0, 0);
- for (TempQueue t:queues) {
+ for (TempQueuePerPartition t:queues) {
if (Resources.greaterThan(rc, tot_guarant, t.current, t.idealAssigned)) {
Resources.addTo(totPreemptionNeeded,
Resources.subtract(t.current, t.idealAssigned));
@@ -379,12 +381,12 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// assign to each queue the amount of actual preemption based on local
// information of ideal preemption and scaling factor
- for (TempQueue t : queues) {
+ for (TempQueuePerPartition t : queues) {
t.assignPreemption(scalingFactor, rc, tot_guarant);
}
if (LOG.isDebugEnabled()) {
long time = clock.getTime();
- for (TempQueue t : queues) {
+ for (TempQueuePerPartition t : queues) {
LOG.debug(time + ": " + t);
}
}
@@ -400,8 +402,8 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* distributed uniformly.
*/
private void computeFixpointAllocation(ResourceCalculator rc,
- Resource tot_guarant, Collection<TempQueue> qAlloc, Resource unassigned,
- boolean ignoreGuarantee) {
+ Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
+ Resource unassigned, boolean ignoreGuarantee) {
// Prior to assigning the unused resources, process each queue as follows:
// If current > guaranteed, idealAssigned = guaranteed + untouchable extra
// Else idealAssigned = current;
@@ -410,10 +412,10 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// idealAssigned >= current + pending), remove it from consideration.
// Sort queues from most under-guaranteed to most over-guaranteed.
TQComparator tqComparator = new TQComparator(rc, tot_guarant);
- PriorityQueue<TempQueue> orderedByNeed =
- new PriorityQueue<TempQueue>(10,tqComparator);
- for (Iterator<TempQueue> i = qAlloc.iterator(); i.hasNext();) {
- TempQueue q = i.next();
+ PriorityQueue<TempQueuePerPartition> orderedByNeed =
+ new PriorityQueue<TempQueuePerPartition>(10, tqComparator);
+ for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+ TempQueuePerPartition q = i.next();
if (Resources.greaterThan(rc, tot_guarant, q.current, q.guaranteed)) {
q.idealAssigned = Resources.add(q.guaranteed, q.untouchableExtra);
} else {
@@ -442,10 +444,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// place it back in the ordered list of queues, recalculating its place
// in the order of most under-guaranteed to most over-guaranteed. In this
// way, the most underserved queue(s) are always given resources first.
- Collection<TempQueue> underserved =
+ Collection<TempQueuePerPartition> underserved =
getMostUnderservedQueues(orderedByNeed, tqComparator);
- for (Iterator<TempQueue> i = underserved.iterator(); i.hasNext();) {
- TempQueue sub = i.next();
+ for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+ .hasNext();) {
+ TempQueuePerPartition sub = i.next();
Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
Resource wQidle = sub.offer(wQavail, rc, tot_guarant);
@@ -466,13 +469,13 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// Take the most underserved TempQueue (the one on the head). Collect and
// return the list of all queues that have the same idealAssigned
// percentage of guaranteed.
- protected Collection<TempQueue> getMostUnderservedQueues(
- PriorityQueue<TempQueue> orderedByNeed, TQComparator tqComparator) {
- ArrayList<TempQueue> underserved = new ArrayList<TempQueue>();
+ protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
+ PriorityQueue<TempQueuePerPartition> orderedByNeed, TQComparator tqComparator) {
+ ArrayList<TempQueuePerPartition> underserved = new ArrayList<TempQueuePerPartition>();
while (!orderedByNeed.isEmpty()) {
- TempQueue q1 = orderedByNeed.remove();
+ TempQueuePerPartition q1 = orderedByNeed.remove();
underserved.add(q1);
- TempQueue q2 = orderedByNeed.peek();
+ TempQueuePerPartition q2 = orderedByNeed.peek();
// q1's pct of guaranteed won't be larger than q2's. If it's less, then
// return what has already been collected. Otherwise, q1's pct of
// guaranteed == that of q2, so add q2 to underserved list during the
@@ -491,24 +494,90 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @param queues the list of queues to consider
*/
private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
- Collection<TempQueue> queues, boolean ignoreGuar) {
+ Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
Resource activeCap = Resource.newInstance(0, 0);
if (ignoreGuar) {
- for (TempQueue q : queues) {
+ for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = (float) 1.0f / ((float) queues.size());
}
} else {
- for (TempQueue q : queues) {
+ for (TempQueuePerPartition q : queues) {
Resources.addTo(activeCap, q.guaranteed);
}
- for (TempQueue q : queues) {
+ for (TempQueuePerPartition q : queues) {
q.normalizedGuarantee = Resources.divide(rc, clusterResource,
q.guaranteed, activeCap);
}
}
}
+ private String getPartitionByNodeId(NodeId nodeId) {
+ return scheduler.getSchedulerNode(nodeId).getPartition();
+ }
+
+ /**
+ * Return should we preempt rmContainer. If we should, deduct from
+ * <code>resourceToObtainByPartition</code>
+ */
+ private boolean tryPreemptContainerAndDeductResToObtain(
+ Map<String, Resource> resourceToObtainByPartitions,
+ RMContainer rmContainer, Resource clusterResource,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
+ ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+ // We will not account resource of a container twice or more
+ if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+ return false;
+ }
+
+ String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
+ Resource toObtainByPartition =
+ resourceToObtainByPartitions.get(nodePartition);
+
+ if (null != toObtainByPartition
+ && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+ Resources.none())) {
+ Resources.subtractFrom(toObtainByPartition,
+ rmContainer.getAllocatedResource());
+ // When we have no more resource need to obtain, remove from map.
+ if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+ Resources.none())) {
+ resourceToObtainByPartitions.remove(nodePartition);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Marked container=" + rmContainer.getContainerId()
+ + " in partition=" + nodePartition + " will be preempted");
+ }
+ // Add to preemptMap
+ addToPreemptMap(preemptMap, attemptId, rmContainer);
+ return true;
+ }
+
+ return false;
+ }
+
+ private boolean preemptMapContains(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId attemptId, RMContainer rmContainer) {
+ Set<RMContainer> rmContainers;
+ if (null == (rmContainers = preemptMap.get(attemptId))) {
+ return false;
+ }
+ return rmContainers.contains(rmContainer);
+ }
+
+ private void addToPreemptMap(
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+ ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+ Set<RMContainer> set;
+ if (null == (set = preemptMap.get(appAttemptId))) {
+ set = new HashSet<RMContainer>();
+ preemptMap.put(appAttemptId, set);
+ }
+ set.add(containerToPreempt);
+ }
+
/**
* Based a resource preemption target drop reservations of containers and
* if necessary select containers for preemption from applications in each
@@ -520,64 +589,106 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* @return a map of applciationID to set of containers to preempt
*/
private Map<ApplicationAttemptId,Set<RMContainer>> getContainersToPreempt(
- List<TempQueue> queues, Resource clusterResource) {
+ Set<String> leafQueueNames, Resource clusterResource) {
- Map<ApplicationAttemptId,Set<RMContainer>> preemptMap =
- new HashMap<ApplicationAttemptId,Set<RMContainer>>();
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap =
+ new HashMap<ApplicationAttemptId, Set<RMContainer>>();
List<RMContainer> skippedAMContainerlist = new ArrayList<RMContainer>();
- for (TempQueue qT : queues) {
- if (qT.preemptionDisabled && qT.leafQueue != null) {
+ // Loop all leaf queues
+ for (String queueName : leafQueueNames) {
+ // check if preemption disabled for the queue
+ if (getQueueByPartition(queueName,
+ RMNodeLabelsManager.NO_LABEL).preemptionDisabled) {
if (LOG.isDebugEnabled()) {
- if (Resources.greaterThan(rc, clusterResource,
- qT.toBePreempted, Resource.newInstance(0, 0))) {
- LOG.debug("Tried to preempt the following "
- + "resources from non-preemptable queue: "
- + qT.queueName + " - Resources: " + qT.toBePreempted);
- }
+ LOG.debug("skipping from queue=" + queueName
+ + " because it's a non-preemptable queue");
}
continue;
}
- // we act only if we are violating balance by more than
- // maxIgnoredOverCapacity
- if (Resources.greaterThan(rc, clusterResource, qT.current,
- Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
- // we introduce a dampening factor naturalTerminationFactor that
- // accounts for natural termination of containers
- Resource resToObtain =
- Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
- Resource skippedAMSize = Resource.newInstance(0, 0);
- // lock the leafqueue while we scan applications and unreserve
- synchronized (qT.leafQueue) {
- Iterator<FiCaSchedulerApp> desc =
- qT.leafQueue.getOrderingPolicy().getPreemptionIterator();
+ // compute resToObtainByPartition considered inter-queue preemption
+ LeafQueue leafQueue = null;
+
+ Map<String, Resource> resToObtainByPartition =
+ new HashMap<String, Resource>();
+ for (TempQueuePerPartition qT : getQueuePartitions(queueName)) {
+ leafQueue = qT.leafQueue;
+ // we act only if we are violating balance by more than
+ // maxIgnoredOverCapacity
+ if (Resources.greaterThan(rc, clusterResource, qT.current,
+ Resources.multiply(qT.guaranteed, 1.0 + maxIgnoredOverCapacity))) {
+ // we introduce a dampening factor naturalTerminationFactor that
+ // accounts for natural termination of containers
+ Resource resToObtain =
+ Resources.multiply(qT.toBePreempted, naturalTerminationFactor);
+ // Only add resToObtain when it >= 0
+ if (Resources.greaterThan(rc, clusterResource, resToObtain,
+ Resources.none())) {
+ resToObtainByPartition.put(qT.partition, resToObtain);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Queue=" + queueName + " partition=" + qT.partition
+ + " resource-to-obtain=" + resToObtain);
+ }
+ }
qT.actuallyPreempted = Resources.clone(resToObtain);
- while (desc.hasNext()) {
- FiCaSchedulerApp fc = desc.next();
- if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
- Resources.none())) {
- break;
+ } else {
+ qT.actuallyPreempted = Resources.none();
+ }
+ }
+
+ synchronized (leafQueue) {
+ // go through all ignore-partition-exclusivity containers first to make
+ // sure such containers will be preempted first
+ Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityContainers =
+ leafQueue.getIgnoreExclusivityRMContainers();
+ for (String partition : resToObtainByPartition.keySet()) {
+ if (ignorePartitionExclusivityContainers.containsKey(partition)) {
+ TreeSet<RMContainer> rmContainers =
+ ignorePartitionExclusivityContainers.get(partition);
+ // We will check container from reverse order, so latter submitted
+ // application's containers will be preempted first.
+ for (RMContainer c : rmContainers.descendingSet()) {
+ boolean preempted =
+ tryPreemptContainerAndDeductResToObtain(
+ resToObtainByPartition, c, clusterResource, preemptMap);
+ if (!preempted) {
+ break;
+ }
}
- preemptMap.put(
- fc.getApplicationAttemptId(),
- preemptFrom(fc, clusterResource, resToObtain,
- skippedAMContainerlist, skippedAMSize));
}
- Resource maxAMCapacityForThisQueue = Resources.multiply(
- Resources.multiply(clusterResource,
- qT.leafQueue.getAbsoluteCapacity()),
- qT.leafQueue.getMaxAMResourcePerQueuePercent());
-
- // Can try preempting AMContainers (still saving atmost
- // maxAMCapacityForThisQueue AMResource's) if more resources are
- // required to be preempted from this Queue.
- preemptAMContainers(clusterResource, preemptMap,
- skippedAMContainerlist, resToObtain, skippedAMSize,
- maxAMCapacityForThisQueue);
}
+
+ // preempt other containers
+ Resource skippedAMSize = Resource.newInstance(0, 0);
+ Iterator<FiCaSchedulerApp> desc =
+ leafQueue.getOrderingPolicy().getPreemptionIterator();
+ while (desc.hasNext()) {
+ FiCaSchedulerApp fc = desc.next();
+ // When we complete preempt from one partition, we will remove from
+ // resToObtainByPartition, so when it becomes empty, we can get no
+ // more preemption is needed
+ if (resToObtainByPartition.isEmpty()) {
+ break;
+ }
+
+ preemptFrom(fc, clusterResource, resToObtainByPartition,
+ skippedAMContainerlist, skippedAMSize, preemptMap);
+ }
+
+ // Can try preempting AMContainers (still saving atmost
+ // maxAMCapacityForThisQueue AMResource's) if more resources are
+ // required to be preempted from this Queue.
+ Resource maxAMCapacityForThisQueue = Resources.multiply(
+ Resources.multiply(clusterResource,
+ leafQueue.getAbsoluteCapacity()),
+ leafQueue.getMaxAMResourcePerQueuePercent());
+
+ preemptAMContainers(clusterResource, preemptMap, skippedAMContainerlist,
+ resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue);
}
}
+
return preemptMap;
}
@@ -595,31 +706,27 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
*/
private void preemptAMContainers(Resource clusterResource,
Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
- List<RMContainer> skippedAMContainerlist, Resource resToObtain,
- Resource skippedAMSize, Resource maxAMCapacityForThisQueue) {
+ List<RMContainer> skippedAMContainerlist,
+ Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
+ Resource maxAMCapacityForThisQueue) {
for (RMContainer c : skippedAMContainerlist) {
// Got required amount of resources for preemption, can stop now
- if (Resources.lessThanOrEqual(rc, clusterResource, resToObtain,
- Resources.none())) {
+ if (resToObtainByPartition.isEmpty()) {
break;
}
// Once skippedAMSize reaches down to maxAMCapacityForThisQueue,
- // container selection iteration for preemption will be stopped.
+ // container selection iteration for preemption will be stopped.
if (Resources.lessThanOrEqual(rc, clusterResource, skippedAMSize,
maxAMCapacityForThisQueue)) {
break;
}
- Set<RMContainer> contToPrempt = preemptMap.get(c
- .getApplicationAttemptId());
- if (null == contToPrempt) {
- contToPrempt = new HashSet<RMContainer>();
- preemptMap.put(c.getApplicationAttemptId(), contToPrempt);
+
+ boolean preempted =
+ tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+ clusterResource, preemptMap);
+ if (preempted) {
+ Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
}
- contToPrempt.add(c);
-
- Resources.subtractFrom(resToObtain, c.getContainer().getResource());
- Resources.subtractFrom(skippedAMSize, c.getContainer()
- .getResource());
}
skippedAMContainerlist.clear();
}
@@ -627,71 +734,59 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
/**
* Given a target preemption for a specific application, select containers
* to preempt (after unreserving all reservation for that app).
- *
- * @param app
- * @param clusterResource
- * @param rsrcPreempt
- * @return Set<RMContainer> Set of RMContainers
*/
- private Set<RMContainer> preemptFrom(FiCaSchedulerApp app,
- Resource clusterResource, Resource rsrcPreempt,
- List<RMContainer> skippedAMContainerlist, Resource skippedAMSize) {
- Set<RMContainer> ret = new HashSet<RMContainer>();
+ private void preemptFrom(FiCaSchedulerApp app,
+ Resource clusterResource, Map<String, Resource> resToObtainByPartition,
+ List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
+ Map<ApplicationAttemptId, Set<RMContainer>> preemptMap) {
ApplicationAttemptId appId = app.getApplicationAttemptId();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Looking at application=" + app.getApplicationAttemptId()
+ + " resourceToObtain=" + resToObtainByPartition);
+ }
// first drop reserved containers towards rsrcPreempt
- List<RMContainer> reservations =
+ List<RMContainer> reservedContainers =
new ArrayList<RMContainer>(app.getReservedContainers());
- for (RMContainer c : reservations) {
- if (Resources.lessThanOrEqual(rc, clusterResource,
- rsrcPreempt, Resources.none())) {
- return ret;
+ for (RMContainer c : reservedContainers) {
+ if (resToObtainByPartition.isEmpty()) {
+ return;
}
+
+ // Try to preempt this container
+ tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+ clusterResource, preemptMap);
+
if (!observeOnly) {
dispatcher.handle(new ContainerPreemptEvent(appId, c,
ContainerPreemptEventType.DROP_RESERVATION));
}
- Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
}
// if more resources are to be freed go through all live containers in
// reverse priority and reverse allocation order and mark them for
// preemption
- List<RMContainer> containers =
+ List<RMContainer> liveContainers =
new ArrayList<RMContainer>(app.getLiveContainers());
- sortContainers(containers);
+ sortContainers(liveContainers);
- for (RMContainer c : containers) {
- if (Resources.lessThanOrEqual(rc, clusterResource,
- rsrcPreempt, Resources.none())) {
- return ret;
+ for (RMContainer c : liveContainers) {
+ if (resToObtainByPartition.isEmpty()) {
+ return;
}
+
// Skip AM Container from preemption for now.
if (c.isAMContainer()) {
skippedAMContainerlist.add(c);
- Resources.addTo(skippedAMSize, c.getContainer().getResource());
- continue;
- }
- // skip Labeled resource
- if(isLabeledContainer(c)){
+ Resources.addTo(skippedAMSize, c.getAllocatedResource());
continue;
}
- ret.add(c);
- Resources.subtractFrom(rsrcPreempt, c.getContainer().getResource());
- }
- return ret;
- }
-
- /**
- * Checking if given container is a labeled container
- *
- * @param c
- * @return true/false
- */
- private boolean isLabeledContainer(RMContainer c) {
- return labels.containsKey(c.getAllocatedNode());
+ // Try to preempt this container
+ tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
+ clusterResource, preemptMap);
+ }
}
/**
@@ -733,32 +828,48 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* the leaves. Finally it aggregates pending resources in each queue and rolls
* it up to higher levels.
*
- * @param root the root of the CapacityScheduler queue hierarchy
- * @param clusterResources the total amount of resources in the cluster
+ * @param curQueue current queue which I'm looking at now
+ * @param partitionResource the total amount of resources in the cluster
* @return the root of the cloned queue hierarchy
*/
- private TempQueue cloneQueues(CSQueue root, Resource clusterResources) {
- TempQueue ret;
- synchronized (root) {
- String queueName = root.getQueueName();
- float absUsed = root.getAbsoluteUsedCapacity();
- float absCap = root.getAbsoluteCapacity();
- float absMaxCap = root.getAbsoluteMaximumCapacity();
- boolean preemptionDisabled = root.getPreemptionDisabled();
-
- Resource current = Resources.multiply(clusterResources, absUsed);
- Resource guaranteed = Resources.multiply(clusterResources, absCap);
- Resource maxCapacity = Resources.multiply(clusterResources, absMaxCap);
+ private TempQueuePerPartition cloneQueues(CSQueue curQueue,
+ Resource partitionResource, String partitionToLookAt) {
+ TempQueuePerPartition ret;
+ synchronized (curQueue) {
+ String queueName = curQueue.getQueueName();
+ QueueCapacities qc = curQueue.getQueueCapacities();
+ float absUsed = qc.getAbsoluteUsedCapacity(partitionToLookAt);
+ float absCap = qc.getAbsoluteCapacity(partitionToLookAt);
+ float absMaxCap = qc.getAbsoluteMaximumCapacity(partitionToLookAt);
+ boolean preemptionDisabled = curQueue.getPreemptionDisabled();
+
+ Resource current = Resources.multiply(partitionResource, absUsed);
+ Resource guaranteed = Resources.multiply(partitionResource, absCap);
+ Resource maxCapacity = Resources.multiply(partitionResource, absMaxCap);
+
+ // when partition is a non-exclusive partition, the actual maxCapacity
+ // could more than specified maxCapacity
+ try {
+ if (!scheduler.getRMContext().getNodeLabelManager()
+ .isExclusiveNodeLabel(partitionToLookAt)) {
+ maxCapacity =
+ Resources.max(rc, partitionResource, maxCapacity, current);
+ }
+ } catch (IOException e) {
+ // This may cause by partition removed when running capacity monitor,
+ // just ignore the error, this will be corrected when doing next check.
+ }
Resource extra = Resource.newInstance(0, 0);
- if (Resources.greaterThan(rc, clusterResources, current, guaranteed)) {
+ if (Resources.greaterThan(rc, partitionResource, current, guaranteed)) {
extra = Resources.subtract(current, guaranteed);
}
- if (root instanceof LeafQueue) {
- LeafQueue l = (LeafQueue) root;
- Resource pending = l.getTotalResourcePending();
- ret = new TempQueue(queueName, current, pending, guaranteed,
- maxCapacity, preemptionDisabled);
+ if (curQueue instanceof LeafQueue) {
+ LeafQueue l = (LeafQueue) curQueue;
+ Resource pending =
+ l.getQueueResourceUsage().getPending(partitionToLookAt);
+ ret = new TempQueuePerPartition(queueName, current, pending, guaranteed,
+ maxCapacity, preemptionDisabled, partitionToLookAt);
if (preemptionDisabled) {
ret.untouchableExtra = extra;
} else {
@@ -767,17 +878,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
ret.setLeafQueue(l);
} else {
Resource pending = Resource.newInstance(0, 0);
- ret = new TempQueue(root.getQueueName(), current, pending, guaranteed,
- maxCapacity, false);
+ ret =
+ new TempQueuePerPartition(curQueue.getQueueName(), current, pending,
+ guaranteed, maxCapacity, false, partitionToLookAt);
Resource childrensPreemptable = Resource.newInstance(0, 0);
- for (CSQueue c : root.getChildQueues()) {
- TempQueue subq = cloneQueues(c, clusterResources);
+ for (CSQueue c : curQueue.getChildQueues()) {
+ TempQueuePerPartition subq =
+ cloneQueues(c, partitionResource, partitionToLookAt);
Resources.addTo(childrensPreemptable, subq.preemptableExtra);
ret.addChild(subq);
}
// untouchableExtra = max(extra - childrenPreemptable, 0)
if (Resources.greaterThanOrEqual(
- rc, clusterResources, childrensPreemptable, extra)) {
+ rc, partitionResource, childrensPreemptable, extra)) {
ret.untouchableExtra = Resource.newInstance(0, 0);
} else {
ret.untouchableExtra =
@@ -785,52 +898,87 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
}
}
+ addTempQueuePartition(ret);
return ret;
}
// simple printout function that reports internal queue state (useful for
// plotting)
- private void logToCSV(List<TempQueue> unorderedqueues){
- List<TempQueue> queues = new ArrayList<TempQueue>(unorderedqueues);
- Collections.sort(queues, new Comparator<TempQueue>(){
- @Override
- public int compare(TempQueue o1, TempQueue o2) {
- return o1.queueName.compareTo(o2.queueName);
- }});
+ private void logToCSV(List<String> leafQueueNames){
+ Collections.sort(leafQueueNames);
String queueState = " QUEUESTATE: " + clock.getTime();
StringBuilder sb = new StringBuilder();
sb.append(queueState);
- for (TempQueue tq : queues) {
+
+ for (String queueName : leafQueueNames) {
+ TempQueuePerPartition tq =
+ getQueueByPartition(queueName, RMNodeLabelsManager.NO_LABEL);
sb.append(", ");
tq.appendLogString(sb);
}
LOG.debug(sb.toString());
}
+ private void addTempQueuePartition(TempQueuePerPartition queuePartition) {
+ String queueName = queuePartition.queueName;
+
+ Map<String, TempQueuePerPartition> queuePartitions;
+ if (null == (queuePartitions = queueToPartitions.get(queueName))) {
+ queuePartitions = new HashMap<String, TempQueuePerPartition>();
+ queueToPartitions.put(queueName, queuePartitions);
+ }
+ queuePartitions.put(queuePartition.partition, queuePartition);
+ }
+
+ /**
+ * Get queue partition by given queueName and partitionName
+ */
+ private TempQueuePerPartition getQueueByPartition(String queueName,
+ String partition) {
+ Map<String, TempQueuePerPartition> partitionToQueues = null;
+ if (null == (partitionToQueues = queueToPartitions.get(queueName))) {
+ return null;
+ }
+ return partitionToQueues.get(partition);
+ }
+
+ /**
+ * Get all queue partitions by given queueName
+ */
+ private Collection<TempQueuePerPartition> getQueuePartitions(String queueName) {
+ if (!queueToPartitions.containsKey(queueName)) {
+ return null;
+ }
+ return queueToPartitions.get(queueName).values();
+ }
+
/**
* Temporary data-structure tracking resource availability, pending resource
- * need, current utilization. Used to clone {@link CSQueue}.
+ * need, current utilization. This is per-queue-per-partition data structure
*/
- static class TempQueue {
+ static class TempQueuePerPartition {
final String queueName;
final Resource current;
final Resource pending;
final Resource guaranteed;
final Resource maxCapacity;
+ final String partition;
Resource idealAssigned;
Resource toBePreempted;
+ // For logging purpose
Resource actuallyPreempted;
Resource untouchableExtra;
Resource preemptableExtra;
double normalizedGuarantee;
- final ArrayList<TempQueue> children;
+ final ArrayList<TempQueuePerPartition> children;
LeafQueue leafQueue;
boolean preemptionDisabled;
- TempQueue(String queueName, Resource current, Resource pending,
- Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled) {
+ TempQueuePerPartition(String queueName, Resource current, Resource pending,
+ Resource guaranteed, Resource maxCapacity, boolean preemptionDisabled,
+ String partition) {
this.queueName = queueName;
this.current = current;
this.pending = pending;
@@ -840,10 +988,11 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
this.actuallyPreempted = Resource.newInstance(0, 0);
this.toBePreempted = Resource.newInstance(0, 0);
this.normalizedGuarantee = Float.NaN;
- this.children = new ArrayList<TempQueue>();
+ this.children = new ArrayList<TempQueuePerPartition>();
this.untouchableExtra = Resource.newInstance(0, 0);
this.preemptableExtra = Resource.newInstance(0, 0);
this.preemptionDisabled = preemptionDisabled;
+ this.partition = partition;
}
public void setLeafQueue(LeafQueue l){
@@ -855,19 +1004,19 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
* When adding a child we also aggregate its pending resource needs.
* @param q the child queue to add to this queue
*/
- public void addChild(TempQueue q) {
+ public void addChild(TempQueuePerPartition q) {
assert leafQueue == null;
children.add(q);
Resources.addTo(pending, q.pending);
}
- public void addChildren(ArrayList<TempQueue> queues) {
+ public void addChildren(ArrayList<TempQueuePerPartition> queues) {
assert leafQueue == null;
children.addAll(queues);
}
- public ArrayList<TempQueue> getChildren(){
+ public ArrayList<TempQueuePerPartition> getChildren(){
return children;
}
@@ -909,7 +1058,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
public void printAll() {
LOG.info(this.toString());
- for (TempQueue sub : this.getChildren()) {
+ for (TempQueuePerPartition sub : this.getChildren()) {
sub.printAll();
}
}
@@ -942,7 +1091,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
- static class TQComparator implements Comparator<TempQueue> {
+ static class TQComparator implements Comparator<TempQueuePerPartition> {
private ResourceCalculator rc;
private Resource clusterRes;
@@ -952,7 +1101,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
}
@Override
- public int compare(TempQueue tq1, TempQueue tq2) {
+ public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
return -1;
}
@@ -965,7 +1114,7 @@ public class ProportionalCapacityPreemptionPolicy implements SchedulingEditPolic
// Calculates idealAssigned / guaranteed
// TempQueues with 0 guarantees are always considered the most over
// capacity and therefore considered last for resources.
- private double getIdealPctOfGuaranteed(TempQueue q) {
+ private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
double pctOver = Integer.MAX_VALUE;
if (q != null && Resources.greaterThan(
rc, clusterRes, q.guaranteed, Resources.none())) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
index 2750d4e..316a450 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmcontainer/RMContainerImpl.java
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.util.ConverterUtils;
import org.apache.hadoop.yarn.webapp.util.WebAppUtils;
@SuppressWarnings({"unchecked", "rawtypes"})
-public class RMContainerImpl implements RMContainer {
+public class RMContainerImpl implements RMContainer, Comparable<RMContainer> {
private static final Log LOG = LogFactory.getLog(RMContainerImpl.class);
@@ -615,4 +615,30 @@ public class RMContainerImpl implements RMContainer {
}
return nodeLabelExpression;
}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj instanceof RMContainer) {
+ if (null != getContainerId()) {
+ return getContainerId().equals(((RMContainer) obj).getContainerId());
+ }
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ if (null != getContainerId()) {
+ return getContainerId().hashCode();
+ }
+ return super.hashCode();
+ }
+
+ @Override
+ public int compareTo(RMContainer o) {
+ if (containerId != null && o.getContainerId() != null) {
+ return containerId.compareTo(o.getContainerId());
+ }
+ return -1;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 1e1623d..48c7f2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -153,7 +153,7 @@ public class CapacityScheduler extends
static final PartitionedQueueComparator partitionedQueueComparator =
new PartitionedQueueComparator();
- static final Comparator<FiCaSchedulerApp> applicationComparator =
+ public static final Comparator<FiCaSchedulerApp> applicationComparator =
new Comparator<FiCaSchedulerApp>() {
@Override
public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 22aafaa..56ade84 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -68,9 +68,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
@@ -118,11 +119,16 @@ public class LeafQueue extends AbstractCSQueue {
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo();
-
+
private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
private OrderingPolicy<FiCaSchedulerApp>
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
+
+ // record all ignore partition exclusivityRMContainer, this will be used to do
+ // preemption, key is the partition of the RMContainer allocated on
+ private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
+ new HashMap<>();
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -921,11 +927,16 @@ public class LeafQueue extends AbstractCSQueue {
Resource assigned = assignment.getResource();
if (Resources.greaterThan(
resourceCalculator, clusterResource, assigned, Resources.none())) {
+ // Get reserved or allocated container from application
+ RMContainer reservedOrAllocatedRMContainer =
+ application.getRMContainer(assignment
+ .getAssignmentInformation()
+ .getFirstAllocatedOrReservedContainerId());
// Book-keeping
// Note: Update headroom to account for current allocation too...
allocateResource(clusterResource, application, assigned,
- node.getPartition());
+ node.getPartition(), reservedOrAllocatedRMContainer);
// Don't reset scheduling opportunities for offswitch assignments
// otherwise the app will be delayed for each non-local assignment.
@@ -1720,7 +1731,7 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.containerReleased(application, rmContainer);
releaseResource(clusterResource, application,
- container.getResource(), node.getPartition());
+ container.getResource(), node.getPartition(), rmContainer);
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@@ -1738,9 +1749,22 @@ public class LeafQueue extends AbstractCSQueue {
synchronized void allocateResource(Resource clusterResource,
SchedulerApplicationAttempt application, Resource resource,
- String nodePartition) {
+ String nodePartition, RMContainer rmContainer) {
super.allocateResource(clusterResource, resource, nodePartition);
+ // handle ignore exclusivity container
+ if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+ RMNodeLabelsManager.NO_LABEL)
+ && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ TreeSet<RMContainer> rmContainers = null;
+ if (null == (rmContainers =
+ ignorePartitionExclusivityRMContainers.get(nodePartition))) {
+ rmContainers = new TreeSet<>();
+ ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
+ }
+ rmContainers.add(rmContainer);
+ }
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
@@ -1760,10 +1784,25 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- synchronized void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource, String nodePartition) {
+ synchronized void releaseResource(Resource clusterResource,
+ FiCaSchedulerApp application, Resource resource, String nodePartition,
+ RMContainer rmContainer) {
super.releaseResource(clusterResource, resource, nodePartition);
+ // handle ignore exclusivity container
+ if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
+ RMNodeLabelsManager.NO_LABEL)
+ && !nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
+ Set<RMContainer> rmContainers =
+ ignorePartitionExclusivityRMContainers.get(nodePartition);
+ rmContainers.remove(rmContainer);
+ if (rmContainers.isEmpty()) {
+ ignorePartitionExclusivityRMContainers.remove(nodePartition);
+ }
+ }
+ }
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
@@ -1912,7 +1951,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource(), node.getPartition());
+ .getResource(), node.getPartition(), rmContainer);
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1953,7 +1992,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition());
+ .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1971,7 +2010,7 @@ public class LeafQueue extends AbstractCSQueue {
FiCaSchedulerNode node =
scheduler.getNode(rmContainer.getContainer().getNodeId());
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition());
+ .getResource(), node.getPartition(), rmContainer);
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1982,6 +2021,17 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+ /**
+ * return all ignored partition exclusivity RMContainers in the LeafQueue, this
+ * will be used by preemption policy, and use of return
+ * ignorePartitionExclusivityRMContainer should protected by LeafQueue
+ * synchronized lock
+ */
+ public synchronized Map<String, TreeSet<RMContainer>>
+ getIgnoreExclusivityRMContainers() {
+ return ignorePartitionExclusivityRMContainers;
+ }
+
public void setCapacity(float capacity) {
queueCapacities.setCapacity(capacity);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
index c5c067d..5158255 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/AssignmentInformation.java
@@ -18,16 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common;
-import org.apache.hadoop.classification.InterfaceAudience;
-import org.apache.hadoop.classification.InterfaceStability;
-import org.apache.hadoop.yarn.api.records.ContainerId;
-import org.apache.hadoop.yarn.api.records.Resource;
-
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience;
+import org.apache.hadoop.classification.InterfaceStability;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
@InterfaceAudience.Private
@InterfaceStability.Unstable
public class AssignmentInformation {
@@ -117,4 +118,24 @@ public class AssignmentInformation {
public List<AssignmentDetails> getReservationDetails() {
return operationDetails.get(Operation.RESERVATION);
}
+
+ private ContainerId getFirstContainerIdFromOperation(Operation op) {
+ if (null != operationDetails.get(Operation.ALLOCATION)) {
+ List<AssignmentDetails> assignDetails =
+ operationDetails.get(Operation.ALLOCATION);
+ if (!assignDetails.isEmpty()) {
+ return assignDetails.get(0).containerId;
+ }
+ }
+ return null;
+ }
+
+ public ContainerId getFirstAllocatedOrReservedContainerId() {
+ ContainerId containerId = null;
+ containerId = getFirstContainerIdFromOperation(Operation.ALLOCATION);
+ if (null != containerId) {
+ return containerId;
+ }
+ return getFirstContainerIdFromOperation(Operation.RESERVATION);
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/9bf09b33/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
index 9e8b769..6c0ed6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicy.java
@@ -25,11 +25,12 @@ import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.Pro
import static org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.WAIT_TIME_BEFORE_KILL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.KILL_CONTAINER;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType.PREEMPT_CONTAINER;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.*;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
import static org.mockito.Matchers.argThat;
import static org.mockito.Matchers.isA;
import static org.mockito.Mockito.mock;
@@ -37,27 +38,17 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyString;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
import java.util.ArrayList;
import java.util.Comparator;
import java.util.Deque;
-import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
-import java.util.Iterator;
-import java.util.Map;
import java.util.NavigableSet;
import java.util.Random;
-import java.util.Set;
import java.util.StringTokenizer;
import java.util.TreeSet;
-import org.apache.commons.collections.map.HashedMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.service.Service;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -76,23 +67,27 @@ import org.apache.hadoop.yarn.server.resourcemanager.resource.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerPreemptEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatcher;
-import org.mortbay.log.Log;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
public class TestProportionalCapacityPreemptionPolicy {
@@ -799,50 +794,6 @@ public class TestProportionalCapacityPreemptionPolicy {
}
@Test
- public void testIdealAllocationForLabels() {
- int[][] qData = new int[][] {
- // / A B
- { 80, 40, 40 }, // abs
- { 80, 80, 80 }, // maxcap
- { 80, 80, 0 }, // used
- { 70, 20, 50 }, // pending
- { 0, 0, 0 }, // reserved
- { 5, 4, 1 }, // apps
- { -1, 1, 1 }, // req granularity
- { 2, 0, 0 }, // subqueues
- };
- setAMContainer = true;
- setLabeledContainer = true;
- Map<NodeId, Set<String>> labels = new HashMap<NodeId, Set<String>>();
- NodeId node = NodeId.newInstance("node1", 0);
- Set<String> labelSet = new HashSet<String>();
- labelSet.add("x");
- labels.put(node, labelSet);
- when(lm.getNodeLabels()).thenReturn(labels);
- ProportionalCapacityPreemptionPolicy policy = buildPolicy(qData);
- // Subtracting Label X resources from cluster resources
- when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
- Resources.clone(Resource.newInstance(80, 0)));
- clusterResources.setMemory(100);
- policy.editSchedule();
-
- // By skipping AM Container and Labeled container, all other 18 containers
- // of appD will be
- // preempted
- verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appD)));
-
- // By skipping AM Container and Labeled container, all other 18 containers
- // of appC will be
- // preempted
- verify(mDisp, times(19)).handle(argThat(new IsPreemptionRequestFor(appC)));
-
- // rest 4 containers from appB will be preempted
- verify(mDisp, times(2)).handle(argThat(new IsPreemptionRequestFor(appB)));
- setAMContainer = false;
- setLabeledContainer = false;
- }
-
- @Test
public void testPreemptSkippedAMContainers() {
int[][] qData = new int[][] {
// / A B
@@ -944,6 +895,12 @@ public class TestProportionalCapacityPreemptionPolicy {
clusterResources =
Resource.newInstance(leafAbsCapacities(qData[0], qData[7]), 0);
when(mCS.getClusterResource()).thenReturn(clusterResources);
+ when(lm.getResourceByLabel(anyString(), any(Resource.class))).thenReturn(
+ clusterResources);
+
+ SchedulerNode mNode = mock(SchedulerNode.class);
+ when(mNode.getPartition()).thenReturn(RMNodeLabelsManager.NO_LABEL);
+ when(mCS.getSchedulerNode(any(NodeId.class))).thenReturn(mNode);
return policy;
}
@@ -965,11 +922,16 @@ public class TestProportionalCapacityPreemptionPolicy {
float tot = leafAbsCapacities(abs, queues);
Deque<ParentQueue> pqs = new LinkedList<ParentQueue>();
ParentQueue root = mockParentQueue(null, queues[0], pqs);
- when(root.getQueueName()).thenReturn("/");
+ when(root.getQueueName()).thenReturn(CapacitySchedulerConfiguration.ROOT);
when(root.getAbsoluteUsedCapacity()).thenReturn(used[0] / tot);
when(root.getAbsoluteCapacity()).thenReturn(abs[0] / tot);
when(root.getAbsoluteMaximumCapacity()).thenReturn(maxCap[0] / tot);
- when(root.getQueuePath()).thenReturn("root");
+ QueueCapacities rootQc = new QueueCapacities(true);
+ rootQc.setAbsoluteUsedCapacity(used[0] / tot);
+ rootQc.setAbsoluteCapacity(abs[0] / tot);
+ rootQc.setAbsoluteMaximumCapacity(maxCap[0] / tot);
+ when(root.getQueueCapacities()).thenReturn(rootQc);
+ when(root.getQueuePath()).thenReturn(CapacitySchedulerConfiguration.ROOT);
boolean preemptionDisabled = mockPreemptionStatus("root");
when(root.getPreemptionDisabled()).thenReturn(preemptionDisabled);
@@ -987,6 +949,14 @@ public class TestProportionalCapacityPreemptionPolicy {
when(q.getAbsoluteUsedCapacity()).thenReturn(used[i] / tot);
when(q.getAbsoluteCapacity()).thenReturn(abs[i] / tot);
when(q.getAbsoluteMaximumCapacity()).thenReturn(maxCap[i] / tot);
+
+ // We need to make these fields to QueueCapacities
+ QueueCapacities qc = new QueueCapacities(false);
+ qc.setAbsoluteUsedCapacity(used[i] / tot);
+ qc.setAbsoluteCapacity(abs[i] / tot);
+ qc.setAbsoluteMaximumCapacity(maxCap[i] / tot);
+ when(q.getQueueCapacities()).thenReturn(qc);
+
String parentPathName = p.getQueuePath();
parentPathName = (parentPathName == null) ? "root" : parentPathName;
String queuePathName = (parentPathName+"."+queueName).replace("/","root");
@@ -1028,6 +998,7 @@ public class TestProportionalCapacityPreemptionPolicy {
return pq;
}
+ @SuppressWarnings("rawtypes")
LeafQueue mockLeafQueue(ParentQueue p, float tot, int i, int[] abs,
int[] used, int[] pending, int[] reserved, int[] apps, int[] gran) {
LeafQueue lq = mock(LeafQueue.class);
@@ -1035,6 +1006,10 @@ public class TestProportionalCapacityPreemptionPolicy {
new ArrayList<ApplicationAttemptId>();
when(lq.getTotalResourcePending()).thenReturn(
Resource.newInstance(pending[i], 0));
+ // need to set pending resource in resource usage as well
+ ResourceUsage ru = new ResourceUsage();
+ ru.setPending(Resource.newInstance(pending[i], 0));
+ when(lq.getQueueResourceUsage()).thenReturn(ru);
// consider moving where CapacityScheduler::comparator accessible
final NavigableSet<FiCaSchedulerApp> qApps = new TreeSet<FiCaSchedulerApp>(
new Comparator<FiCaSchedulerApp>() {
@@ -1124,6 +1099,7 @@ public class TestProportionalCapacityPreemptionPolicy {
when(mC.getContainerId()).thenReturn(cId);
when(mC.getContainer()).thenReturn(c);
when(mC.getApplicationAttemptId()).thenReturn(appAttId);
+ when(mC.getAllocatedResource()).thenReturn(r);
if (priority.AMCONTAINER.getValue() == cpriority) {
when(mC.isAMContainer()).thenReturn(true);
}