You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2017/07/12 14:00:29 UTC
[1/2] hadoop git commit: Revert "YARN-2113. Add cross-user preemption
within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"
Repository: hadoop
Updated Branches:
refs/heads/branch-2 3419381e9 -> e6cdf770c
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
deleted file mode 100644
index 7df52f9..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit.java
+++ /dev/null
@@ -1,899 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-
-/**
- * Test class for IntraQueuePreemption scenarios.
- */
-public class TestProportionalCapacityPreemptionPolicyIntraQueueUserLimit
- extends
- ProportionalCapacityPreemptionPolicyMockFramework {
- @Before
- public void setup() {
- super.setup();
- conf.setBoolean(
- CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
- policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
- }
-
- @Test
- public void testSimpleIntraQueuePreemptionWithTwoUsers()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Preconditions:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 100 | 0 |
- * | app2 | user2 | 1 | 0 | 30 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
- * has a demand of 0 and its already using 100. app2 from user2 has a demand
- * of 30, and UL is 50. 30 would be preempted from app1.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 30 0]);" + // root
- "-a(=[100 100 100 30 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,100,false,0,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,0,false,30,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource and its well under its user-limit. Hence preempt
- // resources from app1.
- verify(mDisp, times(30)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionWithSingleUser()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 100 | 0 |
- * | app2 | user1 | 1 | 0 | 30 |
- * +--------------+----------+------+---------+
- * Given single user, lower priority/late submitted apps has to
- * wait.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 30 0]);" + // root
- "-a(=[100 100 100 30 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,100,false,0,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,0,false,30,user1)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource. Since app1,2 are from same user, there wont be
- // any preemption.
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionWithTwoUserUnderUserLimit()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 50 | 0 |
- * | app2 | user2 | 1 | 30 | 30 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
- * has a demand of 0 and its already using 50. app2 from user2 has a demand
- * of 30, and UL is 50. Since app1 is under UL, there should not be any
- * preemption.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 80 30 0]);" + // root
- "-a(=[100 100 80 30 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,50,false,0,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,30,false,30,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource. Since app1,2 are from same user, there wont be
- // any preemption.
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-
- @Test
- public void testSimpleIntraQueuePreemptionWithTwoUsersWithAppPriority()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 2 | 100 | 0 |
- * | app2 | user2 | 1 | 0 | 30 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. app1 of high priority
- * has a demand of 0 and its already using 100. app2 from user2 has a demand
- * of 30, and UL is 50. 30 would be preempted from app1.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 30 0]);" + // root
- "-a(=[100 100 100 30 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(2,1,n1,,100,false,0,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,0,false,30,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource and its well under its user-limit. Hence preempt
- // resources from app1 even though its priority is more than app2.
- verify(mDisp, times(30)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-
- @Test
- public void testIntraQueuePreemptionOfUserLimitWithMultipleApps()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 30 | 30 |
- * | app2 | user2 | 1 | 20 | 20 |
- * | app3 | user1 | 1 | 30 | 30 |
- * | app4 | user2 | 1 | 0 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 80 90 0]);" + // root
- "-a(=[100 100 80 90 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,30,false,30,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,20,false,20,user2);" +
- "a\t" // app3 in a
- + "(1,1,n1,,30,false,30,user1);" +
- "a\t" // app4 in a
- + "(1,1,n1,,0,false,10,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app3 (compare to app1, app3 has low priority).
- verify(mDisp, times(9)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(3))));
- }
-
- @Test
- public void testNoPreemptionOfUserLimitWithMultipleAppsAndSameUser()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 30 | 30 |
- * | app2 | user1 | 1 | 20 | 20 |
- * | app3 | user1 | 1 | 30 | 30 |
- * | app4 | user1 | 1 | 0 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 80 90 0]);" + // root
- "-a(=[100 100 80 90 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,30,false,20,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,20,false,20,user1);" +
- "a\t" // app3 in a
- + "(1,1,n1,,30,false,30,user1);" +
- "a\t" // app4 in a
- + "(1,1,n1,,0,false,10,user1)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app3 (compare to app1, app3 has low priority).
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(3))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(4))));
- }
-
- @Test
- public void testIntraQueuePreemptionOfUserLimitWitAppsOfDifferentPriority()
- throws IOException {
- /**
- * Queue structure is:
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 3 | 30 | 30 |
- * | app2 | user2 | 1 | 20 | 20 |
- * | app3 | user1 | 4 | 30 | 0 |
- * | app4 | user2 | 1 | 0 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(
- CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 80 60 0]);" + // root
- "-a(=[100 100 80 60 0])"; // b
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(3,1,n1,,30,false,30,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,20,false,20,user2);" + "a\t" // app3 in a
- + "(4,1,n1,,30,false,0,user1);" + "a\t" // app4 in a
- + "(1,1,n1,,0,false,10,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(9)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-
- @Test
- public void testIntraQueuePreemptionOfUserLimitInTwoQueues()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * / \
- * a b
- * </pre>
- *
- * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
- * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
- * that intra-queue preemption could occur in two queues when user-limit
- * irreuglarity is present in queue.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 90 80 0]);" + // root
- "-a(=[60 100 55 60 0]);" + // a
- "-b(=[40 100 35 20 0])"; // b
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(3,1,n1,,20,false,30,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,20,false,20,user2);" +
- "a\t" // app3 in a
- + "(4,1,n1,,15,false,0,user1);" +
- "a\t" // app4 in a
- + "(1,1,n1,,0,false,10,user2);" +
- "b\t" // app5 in b
- + "(3,1,n1,,25,false,10,user1);" +
- "b\t" // app6 in b
- + "(1,1,n1,,10,false,10,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(4)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(4)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(5))));
- }
-
- @Test
- public void testIntraQueuePreemptionWithTwoRequestingUsers()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 60 | 10 |
- * | app2 | user2 | 1 | 40 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 20 0]);" + // root
- "-a(=[100 100 100 20 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,1,n1,,60,false,10,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,40,false,10,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource and its well under its user-limit. Hence preempt
- // resources from app1.
- verify(mDisp, times(9)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionIfBelowUserLimitAndLowPriorityExtraUsers()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Preconditions:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 50 | 0 |
- * | app2 | user2 | 1 | 50 | 0 |
- * | app3 | user3 | 0 | 0 | 10 |
- * +--------------+----------+------+---------+
- * This scenario should never preempt from either user1 or user2
- */
-
- // Set max preemption per round to 50% (this is different from minimum user
- // limit percent).
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.7);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 10 0]);" + // root
- "-a(=[100 100 100 10 0])"; // a
-
- String appsConfig =
- // queueName\t\
- // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
- "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
- "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
- "a\t(0,1,n1,,0,false,10,user3)\t50"; // app3, user3
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionIfBelowUserLimitAndSamePriorityExtraUsers()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Preconditions:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 50 | 0 |
- * | app2 | user2 | 1 | 50 | 0 |
- * | app3 | user3 | 1 | 0 | 10 |
- * +--------------+----------+------+---------+
- * This scenario should never preempt from either user1 or user2
- */
-
- // Set max preemption per round to 50% (this is different from minimum user
- // limit percent).
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.7);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 10 0]);" + // root
- "-a(=[100 100 100 10 0])"; // a
-
- String appsConfig =
- // queueName\t\
- // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
- "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
- "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
- "a\t(1,1,n1,,0,false,10,user3)\t50"; // app3, user3
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionIfBelowUserLimitAndHighPriorityExtraUsers()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Preconditions:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 50 | 0 |
- * | app2 | user2 | 1 | 50 | 0 |
- * | app3 | user3 | 5 | 0 | 10 |
- * +--------------+----------+------+---------+
- * This scenario should never preempt from either user1 or user2
- */
-
- // Set max preemption per round to 50% (this is different from minimum user
- // limit percent).
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.7);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 10 0]);" + // root
- "-a(=[100 100 100 10 0])"; // a
-
- String appsConfig =
- // queueName\t\
- // (priority,resource,host,label,#repeat,reserved,pending,user)\tMULP;
- "a\t(1,1,n1,,50,false,0,user1)\t50;" + // app1, user1
- "a\t(1,1,n1,,50,false,0,user2)\t50;" + // app2, user2
- "a\t(5,1,n1,,0,false,10,user3)\t50"; // app3, user3
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2/app4 needs more resource and its well under its user-limit. Hence
- // preempt resources from app1 (compare to app3, app1 has low priority).
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- }
-
- @Test
- public void testNoIntraQueuePreemptionWithUserLimitDeadzone()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 60 | 10 |
- * | app2 | user2 | 1 | 40 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps but also ensure that user's
- * usage not coming under its user-limit.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 20 0]);" + // root
- "-a(=[100 100 100 20 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,3,n1,,20,false,10,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,40,false,10,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource and its well under its user-limit. Hence preempt
- // 3 resources (9GB) from app1. We will not preempt last container as it may
- // pull user's usage under its user-limit.
- verify(mDisp, times(3)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
- }
-
- @Test
- public void testIntraQueuePreemptionWithUserLimitDeadzoneAndPriority()
- throws IOException {
- /**
- * Queue structure is:
- *
- * <pre>
- * root
- * |
- * a
- * </pre>
- *
- * Scenario:
- * Queue total resources: 100
- * Minimum user limit percent: 50%
- * +--------------+----------+------+---------+
- * | APP | USER | PRIORITY | USED | PENDING |
- * +--------------+----------+------+---------+
- * | app1 | user1 | 1 | 60 | 10 |
- * | app2 | user2 | 1 | 40 | 10 |
- * +--------------+----------+------+---------+
- * Hence in queueA of 100, each user has a quota of 50. Now have multiple
- * apps and check for preemption across apps but also ensure that user's
- * usage not coming under its user-limit.
- */
-
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100 100 100 20 0]);" + // root
- "-a(=[100 100 100 20 0])"; // a
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,3,n1,,20,false,10,user1);" + // app1 a
- "a\t" // app2 in a
- + "(2,1,n1,,0,false,10,user1);" + // app1 a
- "a\t" // app2 in a
- + "(1,1,n1,,40,false,20,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 needs more resource and its well under its user-limit. Hence preempt
- // 3 resources (9GB) from app1. We will not preempt last container as it may
- // pull user's usage under its user-limit.
- verify(mDisp, times(3)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- verify(mDisp, times(0)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(2))));
-
- // After first round, 3 containers were preempted from app1 and resource
- // distribution will be like below.
- appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
- "a\t" // app1 in a
- + "(1,3,n1,,17,false,10,user1);" + // app1 a
- "a\t" // app2 in a
- + "(2,1,n1,,0,false,10,user1);" + // app2 a
- "a\t" // app2 in a
- + "(1,1,n1,,49,false,11,user2)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // app2 has priority demand within same user 'user1'. However user1's used
- // is alredy under UL. Hence no preemption. We will still get 3 container
- // while asserting as it was aleady selected in earlier round.
- verify(mDisp, times(3)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(1))));
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
deleted file mode 100644
index 7784549..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF.java
+++ /dev/null
@@ -1,178 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
-import org.junit.Before;
-import org.junit.Test;
-
-import java.io.IOException;
-
-import static org.mockito.Matchers.argThat;
-import static org.mockito.Mockito.times;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-
-/**
- * Test class for IntraQueuePreemption scenarios.
- */
-public class TestProportionalCapacityPreemptionPolicyIntraQueueWithDRF
- extends
- ProportionalCapacityPreemptionPolicyMockFramework {
- @Before
- public void setup() {
- super.setup();
- conf.setBoolean(
- CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
- rc = new DominantResourceCalculator();
- when(cs.getResourceCalculator()).thenReturn(rc);
- policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
- }
-
- @Test
- public void testSimpleIntraQueuePreemptionWithVCoreResource()
- throws IOException {
- /**
- * The simplest test preemption, Queue structure is:
- *
- * <pre>
- * root
- * / | | \
- * a b c d
- * </pre>
- *
- * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
- * 100 Scenario: Queue B has few running apps and two high priority apps
- * have demand. Apps which are running at low priority (4) will preempt few
- * of its resources to meet the demand.
- */
-
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
-
- String labelsConfig = "=100:200,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100:200";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100:50 100:50 80:40 120:60 0]);" + // root
- "-a(=[10:5 100:50 10:5 50:25 0]);" + // a
- "-b(=[40:20 100:50 40:20 60:30 0]);" + // b
- "-c(=[20:10 100:50 10:5 10:5 0]);" + // c
- "-d(=[30:15 100:50 20:10 0 0])"; // d
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,
- // pending)
- "a\t" // app1 in a
- + "(1,1:1,n1,,5,false,25:25);" + // app1 a
- "a\t" // app2 in a
- + "(1,1:1,n1,,5,false,25:25);" + // app2 a
- "b\t" // app3 in b
- + "(4,1:1,n1,,36,false,20:20);" + // app3 b
- "b\t" // app4 in b
- + "(4,1:1,n1,,2,false,10:10);" + // app4 b
- "b\t" // app4 in b
- + "(5,1:1,n1,,1,false,10:10);" + // app5 b
- "b\t" // app4 in b
- + "(6,1:1,n1,,1,false,10:10);" + // app6 in b
- "c\t" // app1 in a
- + "(1,1:1,n1,,10,false,10:10);" + "d\t" // app7 in c
- + "(1,1:1,n1,,20,false,0)";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // For queue B, app3 and app4 were of lower priority. Hence take 8
- // containers from them by hitting the intraQueuePreemptionDemand of 20%.
- verify(mDisp, times(1)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(4))));
- verify(mDisp, times(7)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(3))));
- }
-
- @Test
- public void testIntraQueuePreemptionWithDominantVCoreResource()
- throws IOException {
- /**
- * The simplest test preemption, Queue structure is:
- *
- * <pre>
- * root
- * / \
- * a b
- * </pre>
- *
- * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
- * Scenario: Queue B has few running apps and two high priority apps have
- * demand. Apps which are running at low priority (4) will preempt few of
- * its resources to meet the demand.
- */
-
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
- // Set max preemption limit as 50%.
- conf.setFloat(CapacitySchedulerConfiguration.
- INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
- (float) 0.5);
-
- String labelsConfig = "=100:200,true;";
- String nodesConfig = // n1 has no label
- "n1= res=100:200";
- String queuesConfig =
- // guaranteed,max,used,pending,reserved
- "root(=[100:50 100:50 50:40 110:60 0]);" + // root
- "-a(=[40:20 100:50 9:9 50:30 0]);" + // a
- "-b(=[60:30 100:50 40:30 60:30 0]);"; // b
-
- String appsConfig =
- // queueName\t(priority,resource,host,expression,#repeat,reserved,
- // pending)
- "a\t" // app1 in a
- + "(1,2:1,n1,,4,false,25:25);" + // app1 a
- "a\t" // app2 in a
- + "(1,1:3,n1,,2,false,25:25);" + // app2 a
- "b\t" // app3 in b
- + "(4,2:1,n1,,10,false,20:20);" + // app3 b
- "b\t" // app4 in b
- + "(4,1:2,n1,,5,false,10:10);" + // app4 b
- "b\t" // app5 in b
- + "(5,1:1,n1,,5,false,30:20);" + // app5 b
- "b\t" // app6 in b
- + "(6,2:1,n1,,5,false,30:20);";
-
- buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
- policy.editSchedule();
-
- // For queue B, app3 and app4 were of lower priority. Hence take 4
- // containers.
- verify(mDisp, times(9)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(3))));
- verify(mDisp, times(4)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(4))));
- verify(mDisp, times(4)).handle(argThat(
- new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
- getAppAttemptId(5))));
- }
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: Revert "YARN-2113. Add cross-user preemption
within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"
Posted by ep...@apache.org.
Revert "YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"
This reverts commit eda4ac07c1835031aca7e27cc673f1c5913813bb.
Commit eda4ac07c1835031aca7e27cc673f1c5913813bb was a separate patch from trunk rather than a cherry-pick. I will cherryp-ick dependencies and then cherry-pick the trunk commit for YARN-2113.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6cdf770
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6cdf770
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6cdf770
Branch: refs/heads/branch-2
Commit: e6cdf770ca863827a1c3a304a1abbfbf37d81462
Parents: 3419381
Author: Eric Payne <ep...@apache.org>
Authored: Wed Jul 12 08:56:40 2017 -0500
Committer: Eric Payne <ep...@apache.org>
Committed: Wed Jul 12 08:56:40 2017 -0500
----------------------------------------------------------------------
.../resource/DefaultResourceCalculator.java | 5 -
.../resource/DominantResourceCalculator.java | 5 -
.../yarn/util/resource/ResourceCalculator.java | 9 -
.../hadoop/yarn/util/resource/Resources.java | 5 -
.../CapacitySchedulerPreemptionContext.java | 5 -
.../CapacitySchedulerPreemptionUtils.java | 9 +-
.../FifoIntraQueuePreemptionPlugin.java | 329 ++-----
.../capacity/IntraQueueCandidatesSelector.java | 112 +--
.../IntraQueuePreemptionComputePlugin.java | 10 +-
.../ProportionalCapacityPreemptionPolicy.java | 25 +-
.../monitor/capacity/TempAppPerPartition.java | 24 +-
.../monitor/capacity/TempQueuePerPartition.java | 14 -
.../monitor/capacity/TempUserPerPartition.java | 88 --
.../CapacitySchedulerConfiguration.java | 8 -
.../scheduler/capacity/LeafQueue.java | 67 +-
...alCapacityPreemptionPolicyMockFramework.java | 89 +-
...ionalCapacityPreemptionPolicyIntraQueue.java | 30 +-
...cityPreemptionPolicyIntraQueueUserLimit.java | 899 -------------------
...pacityPreemptionPolicyIntraQueueWithDRF.java | 178 ----
19 files changed, 173 insertions(+), 1738 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 524a049..ef7229c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -121,9 +121,4 @@ public class DefaultResourceCalculator extends ResourceCalculator {
Resource smaller, Resource bigger) {
return smaller.getMemorySize() <= bigger.getMemorySize();
}
-
- @Override
- public boolean isAnyMajorResourceZero(Resource resource) {
- return resource.getMemorySize() == 0f;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 9155ae3..1457c28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -231,9 +231,4 @@ public class DominantResourceCalculator extends ResourceCalculator {
return smaller.getMemorySize() <= bigger.getMemorySize()
&& smaller.getVirtualCores() <= bigger.getVirtualCores();
}
-
- @Override
- public boolean isAnyMajorResourceZero(Resource resource) {
- return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index d219fe1..a2f85b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -204,13 +204,4 @@ public abstract class ResourceCalculator {
*/
public abstract boolean fitsIn(Resource cluster,
Resource smaller, Resource bigger);
-
- /**
- * Check if resource has any major resource types (which are all NodeManagers
- * included) a zero value.
- *
- * @param resource resource
- * @return returns true if any resource is zero.
- */
- public abstract boolean isAnyMajorResourceZero(Resource resource);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 91a5297..fc46fa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -345,9 +345,4 @@ public class Resources {
return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
}
-
- public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
- Resource resource) {
- return rc.isAnyMajorResourceZero(resource);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index d6f3f6c..982b1f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -18,11 +18,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -65,7 +63,4 @@ interface CapacitySchedulerPreemptionContext {
float getMinimumThresholdForIntraQueuePreemption();
float getMaxAllowableLimitForIntraQueuePreemption();
-
- @Unstable
- IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 0ae3ef0..abad2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -99,7 +99,7 @@ public class CapacitySchedulerPreemptionUtils {
}
deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
- tas, res);
+ tas, res, partition);
}
}
}
@@ -108,10 +108,10 @@ public class CapacitySchedulerPreemptionUtils {
private static void deductPreemptableResourcePerApp(
CapacitySchedulerPreemptionContext context,
Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
- Resource res) {
+ Resource res, String partition) {
for (TempAppPerPartition ta : tas) {
ta.deductActuallyToBePreempted(context.getResourceCalculator(),
- totalPartitionResource, res);
+ totalPartitionResource, res, partition);
}
}
@@ -157,8 +157,7 @@ public class CapacitySchedulerPreemptionUtils {
&& Resources.greaterThan(rc, clusterResource, toObtainByPartition,
Resources.none())
&& Resources.fitsIn(rc, clusterResource,
- rmContainer.getAllocatedResource(), totalPreemptionAllowed)
- && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
+ rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
Resources.subtractFrom(toObtainByPartition,
rmContainer.getAllocatedResource());
Resources.subtractFrom(totalPreemptionAllowed,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 4bf6760..757f567 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -18,13 +18,11 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.List;
import java.util.Map;
import java.util.PriorityQueue;
import java.util.Set;
@@ -35,11 +33,8 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -65,26 +60,6 @@ public class FifoIntraQueuePreemptionPlugin
}
@Override
- public Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
- String partition) {
- TempQueuePerPartition tq = context.getQueueByPartition(queueName,
- partition);
-
- List<FiCaSchedulerApp> apps = new ArrayList<FiCaSchedulerApp>();
- for (TempAppPerPartition tmpApp : tq.getApps()) {
- // If a lower priority app was not selected to get preempted, mark such
- // apps out from preemption candidate selection.
- if (Resources.equals(tmpApp.getActuallyToBePreempted(),
- Resources.none())) {
- continue;
- }
-
- apps.add(tmpApp.app);
- }
- return apps;
- }
-
- @Override
public Map<String, Resource> getResourceDemandFromAppsPerQueue(
String queueName, String partition) {
@@ -114,7 +89,7 @@ public class FifoIntraQueuePreemptionPlugin
@Override
public void computeAppsIdealAllocation(Resource clusterResource,
- TempQueuePerPartition tq,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed,
Resource queueReassignableResource, float maxAllowablePreemptLimit) {
@@ -137,15 +112,17 @@ public class FifoIntraQueuePreemptionPlugin
// 3. Create all tempApps for internal calculation and return a list from
// high priority to low priority order.
- PriorityQueue<TempAppPerPartition> orderedByPriority = createTempAppForResCalculation(
- tq, apps, clusterResource, perUserAMUsed);
+ TAPriorityComparator taComparator = new TAPriorityComparator();
+ PriorityQueue<TempAppPerPartition> orderedByPriority =
+ createTempAppForResCalculation(tq.partition, apps, taComparator);
// 4. Calculate idealAssigned per app by checking based on queue's
// unallocated resource.Also return apps arranged from lower priority to
// higher priority.
- TreeSet<TempAppPerPartition> orderedApps = calculateIdealAssignedResourcePerApp(
- clusterResource, tq, selectedCandidates, queueReassignableResource,
- orderedByPriority);
+ TreeSet<TempAppPerPartition> orderedApps =
+ calculateIdealAssignedResourcePerApp(clusterResource,
+ partitionBasedResource, tq, selectedCandidates,
+ queueReassignableResource, orderedByPriority, perUserAMUsed);
// 5. A configurable limit that could define an ideal allowable preemption
// limit. Based on current queue's capacity,defined how much % could become
@@ -168,7 +145,7 @@ public class FifoIntraQueuePreemptionPlugin
// 7. From lowest priority app onwards, calculate toBePreempted resource
// based on demand.
calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
- Resources.clone(preemptionLimit));
+ preemptionLimit);
// Save all apps (low to high) to temp queue for further reference
tq.addAllApps(orderedApps);
@@ -176,8 +153,7 @@ public class FifoIntraQueuePreemptionPlugin
// 8. There are chances that we may preempt for the demand from same
// priority level, such cases are to be validated out.
validateOutSameAppPriorityFromDemand(clusterResource,
- (TreeSet<TempAppPerPartition>) orderedApps, tq.getUsersPerPartition(),
- context.getIntraQueuePreemptionOrderPolicy());
+ (TreeSet<TempAppPerPartition>) tq.getApps());
if (LOG.isDebugEnabled()) {
LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
@@ -200,17 +176,17 @@ public class FifoIntraQueuePreemptionPlugin
Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
tmpApp.idealAssigned);
- Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
- Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
+ Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+ Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
// Calculate toBePreempted from apps as follows:
// app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
// intra_q_preemptable)
tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
.max(rc, clusterResource, preemtableFromApp, Resources.none()),
- Resources.clone(preemptionLimit));
+ preemptionLimit);
- preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit,
+ preemptionLimit = Resources.subtract(preemptionLimit,
tmpApp.toBePreempted);
}
}
@@ -245,24 +221,31 @@ public class FifoIntraQueuePreemptionPlugin
* }
*
* @param clusterResource Cluster Resource
+ * @param partitionBasedResource resource per partition
* @param tq TempQueue
* @param selectedCandidates Already Selected preemption candidates
* @param queueReassignableResource Resource used in a queue
* @param orderedByPriority List of running apps
+ * @param perUserAMUsed AM used resource
* @return List of temp apps ordered from low to high priority
*/
private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
- Resource clusterResource, TempQueuePerPartition tq,
+ Resource clusterResource, Resource partitionBasedResource,
+ TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource queueReassignableResource,
- PriorityQueue<TempAppPerPartition> orderedByPriority) {
+ PriorityQueue<TempAppPerPartition> orderedByPriority,
+ Map<String, Resource> perUserAMUsed) {
Comparator<TempAppPerPartition> reverseComp = Collections
.reverseOrder(new TAPriorityComparator());
TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
+ Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
String partition = tq.partition;
- Map<String, TempUserPerPartition> usersPerPartition = tq.getUsersPerPartition();
+
+ Map<String, Resource> preCalculatedUserLimit =
+ new HashMap<String, Resource>();
while (!orderedByPriority.isEmpty()) {
// Remove app from the next highest remaining priority and process it to
@@ -272,19 +255,43 @@ public class FifoIntraQueuePreemptionPlugin
// Once unallocated resource is 0, we can stop assigning ideal per app.
if (Resources.lessThanOrEqual(rc, clusterResource,
- queueReassignableResource, Resources.none())
- || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
+ queueReassignableResource, Resources.none())) {
continue;
}
String userName = tmpApp.app.getUser();
- TempUserPerPartition tmpUser = usersPerPartition.get(userName);
- Resource userLimitResource = tmpUser.getUserLimit();
- Resource idealAssignedForUser = tmpUser.idealAssigned;
+ Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+ // Verify whether we already calculated headroom for this user.
+ if (userLimitResource == null) {
+ userLimitResource = Resources.clone(tq.leafQueue
+ .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+ Resource amUsed = perUserAMUsed.get(userName);
+ if (null == amUsed) {
+ amUsed = Resources.createResource(0, 0);
+ }
+
+ // Real AM used need not have to be considered for user-limit as well.
+ userLimitResource = Resources.subtract(userLimitResource, amUsed);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Userlimit for user '" + userName + "' is :"
+ + userLimitResource + ", and amUsed is:" + amUsed);
+ }
+
+ preCalculatedUserLimit.put(userName, userLimitResource);
+ }
+
+ Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+ if (idealAssignedForUser == null) {
+ idealAssignedForUser = Resources.createResource(0, 0);
+ userIdealAssignedMapping.put(userName, idealAssignedForUser);
+ }
// Calculate total selected container resources from current app.
- getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp,
- tmpUser, partition);
+ getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+ tmpApp, partition);
// For any app, used+pending will give its idealAssigned. However it will
// be tightly linked to queue's unallocated quota. So lower priority apps
@@ -295,11 +302,10 @@ public class FifoIntraQueuePreemptionPlugin
if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
userLimitResource)) {
- Resource idealAssigned = Resources.min(rc, clusterResource,
- appIdealAssigned,
+ appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
Resources.subtract(userLimitResource, idealAssignedForUser));
tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
- clusterResource, queueReassignableResource, idealAssigned));
+ clusterResource, queueReassignableResource, appIdealAssigned));
Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
} else {
continue;
@@ -314,8 +320,7 @@ public class FifoIntraQueuePreemptionPlugin
Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
}
- Resources.subtractFromNonNegative(queueReassignableResource,
- tmpApp.idealAssigned);
+ Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
}
return orderedApps;
@@ -327,8 +332,7 @@ public class FifoIntraQueuePreemptionPlugin
*/
private void getAlreadySelectedPreemptionCandidatesResource(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
- TempAppPerPartition tmpApp, TempUserPerPartition tmpUser,
- String partition) {
+ TempAppPerPartition tmpApp, String partition) {
tmpApp.selected = Resources.createResource(0, 0);
Set<RMContainer> containers = selectedCandidates
.get(tmpApp.app.getApplicationAttemptId());
@@ -340,23 +344,16 @@ public class FifoIntraQueuePreemptionPlugin
for (RMContainer cont : containers) {
if (partition.equals(cont.getNodeLabelExpression())) {
Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
- Resources.addTo(tmpUser.selected, cont.getAllocatedResource());
}
}
}
private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
- TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
- Resource clusterResource,
- Map<String, Resource> perUserAMUsed) {
- TAPriorityComparator taComparator = new TAPriorityComparator();
+ String partition, Collection<FiCaSchedulerApp> apps,
+ TAPriorityComparator taComparator) {
PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
100, taComparator);
- String partition = tq.partition;
- Map<String, TempUserPerPartition> usersPerPartition = tq
- .getUsersPerPartition();
-
// have an internal temp app structure to store intermediate data(priority)
for (FiCaSchedulerApp app : apps) {
@@ -388,156 +385,56 @@ public class FifoIntraQueuePreemptionPlugin
tmpApp.idealAssigned = Resources.createResource(0, 0);
orderedByPriority.add(tmpApp);
-
- // Create a TempUserPerPartition structure to hold more information
- // regarding each user's entities such as UserLimit etc. This could
- // be kept in a user to TempUserPerPartition map for further reference.
- String userName = app.getUser();
- if (!usersPerPartition.containsKey(userName)) {
- ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
- .getResourceUsage();
-
- TempUserPerPartition tmpUser = new TempUserPerPartition(
- tq.leafQueue.getUser(userName), tq.queueName,
- Resources.clone(userResourceUsage.getUsed(partition)),
- Resources.clone(perUserAMUsed.get(userName)),
- Resources.clone(userResourceUsage.getReserved(partition)),
- Resources.none());
-
- Resource userLimitResource = Resources.clone(
- tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
- partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
-
- // Real AM used need not have to be considered for user-limit as well.
- userLimitResource = Resources.subtract(userLimitResource,
- tmpUser.amUsed);
- tmpUser.setUserLimit(userLimitResource);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("TempUser:" + tmpUser);
- }
-
- tmpUser.idealAssigned = Resources.createResource(0, 0);
- tq.addUserPerPartition(userName, tmpUser);
- }
}
return orderedByPriority;
}
/*
* Fifo+Priority based preemption policy need not have to preempt resources at
- * same priority level. Such cases will be validated out. But if the demand is
- * from an app of different user, force to preempt resources even if apps are
- * at same priority.
+ * same priority level. Such cases will be validated out.
*/
public void validateOutSameAppPriorityFromDemand(Resource cluster,
- TreeSet<TempAppPerPartition> orderedApps,
- Map<String, TempUserPerPartition> usersPerPartition,
- IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) {
+ TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
- TempAppPerPartition[] apps = orderedApps
- .toArray(new TempAppPerPartition[orderedApps.size()]);
+ TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+ .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
if (apps.length <= 0) {
return;
}
- for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) {
-
- // Check whether high priority app with demand needs resource from other
- // user.
- if (Resources.greaterThan(rc, cluster,
- apps[hPriority].getToBePreemptFromOther(), Resources.none())) {
-
- // Given we have a demand from a high priority app, we can do a reverse
- // scan from lower priority apps to select resources.
- // Since idealAssigned of each app has considered user-limit, this logic
- // will provide eventual consistency w.r.t user-limit as well.
- for (int lPriority = 0; lPriority < apps.length; lPriority++) {
+ int lPriority = 0;
+ int hPriority = apps.length - 1;
+
+ while (lPriority < hPriority
+ && !apps[lPriority].equals(apps[hPriority])
+ && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+ Resource toPreemptFromOther = apps[hPriority]
+ .getToBePreemptFromOther();
+ Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+ Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+ actuallyToPreempt);
+
+ if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+ Resource toPreempt = Resources.min(rc, cluster,
+ toPreemptFromOther, delta);
+
+ apps[hPriority].setToBePreemptFromOther(
+ Resources.subtract(toPreemptFromOther, toPreempt));
+ apps[lPriority].setActuallyToBePreempted(
+ Resources.add(actuallyToPreempt, toPreempt));
+ }
- // Check whether app with demand needs resource from other user.
- if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted,
- Resources.none())) {
+ if (Resources.lessThanOrEqual(rc, cluster,
+ apps[lPriority].toBePreempted,
+ apps[lPriority].getActuallyToBePreempted())) {
+ lPriority++;
+ continue;
+ }
- // If apps are of same user, and priority is same, then skip.
- if ((apps[hPriority].getUser().equals(apps[lPriority].getUser()))
- && (apps[lPriority].getPriority() >= apps[hPriority]
- .getPriority())) {
- continue;
- }
-
- if (Resources.lessThanOrEqual(rc, cluster,
- apps[lPriority].toBePreempted,
- apps[lPriority].getActuallyToBePreempted())
- || Resources.equals(apps[hPriority].getToBePreemptFromOther(),
- Resources.none())) {
- continue;
- }
-
- // Ideally if any application has a higher priority, then it can
- // force to preempt any lower priority app from any user. However
- // if admin enforces user-limit over priority, preemption module
- // will not choose lower priority apps from usre's who are not yet
- // met its user-limit.
- TempUserPerPartition tmpUser = usersPerPartition
- .get(apps[lPriority].getUser());
- if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
- && (!tmpUser.isUserLimitReached(rc, cluster))
- && (intraQueuePreemptionOrder
- .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) {
- continue;
- }
-
- Resource toPreemptFromOther = apps[hPriority]
- .getToBePreemptFromOther();
- Resource actuallyToPreempt = apps[lPriority]
- .getActuallyToBePreempted();
-
- // A lower priority app could offer more resource to preempt, if
- // multiple higher priority/under served users needs resources.
- // After one iteration, we need to ensure that actuallyToPreempt is
- // subtracted from the resource to preempt.
- Resource preemptableFromLowerPriorityApp = Resources
- .subtract(apps[lPriority].toBePreempted, actuallyToPreempt);
-
- // In case of user-limit preemption, when app's are from different
- // user and of same priority, we will do user-limit preemption if
- // there is a demand from under UL quota app.
- // However this under UL quota app's demand may be more.
- // Still we should ensure that we are not doing over preemption such
- // that only a maximum of (user's used - UL quota) could be
- // preempted.
- if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
- && (apps[lPriority].getPriority() == apps[hPriority]
- .getPriority())
- && tmpUser.isUserLimitReached(rc, cluster)) {
-
- Resource deltaULQuota = Resources
- .subtract(tmpUser.getUsedDeductAM(), tmpUser.selected);
- Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit());
-
- if (tmpUser.isPreemptionQuotaForULDeltaDone()) {
- deltaULQuota = Resources.createResource(0, 0);
- }
-
- if (Resources.lessThan(rc, cluster, deltaULQuota,
- preemptableFromLowerPriorityApp)) {
- tmpUser.updatePreemptionQuotaForULDeltaAsDone(true);
- preemptableFromLowerPriorityApp = deltaULQuota;
- }
- }
-
- if (Resources.greaterThan(rc, cluster,
- preemptableFromLowerPriorityApp, Resources.none())) {
- Resource toPreempt = Resources.min(rc, cluster,
- toPreemptFromOther, preemptableFromLowerPriorityApp);
-
- apps[hPriority].setToBePreemptFromOther(
- Resources.subtract(toPreemptFromOther, toPreempt));
- apps[lPriority].setActuallyToBePreempted(
- Resources.add(actuallyToPreempt, toPreempt));
- }
- }
- }
+ if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+ Resources.none())) {
+ hPriority--;
+ continue;
}
}
}
@@ -557,40 +454,6 @@ public class FifoIntraQueuePreemptionPlugin
Resources.addTo(userAMResource, app.getAMResource(partition));
Resources.addTo(amUsed, app.getAMResource(partition));
}
-
return amUsed;
}
-
- @Override
- public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
- Resource clusterResource, Resource usedResource, RMContainer c) {
- // Ensure below checks
- // 1. This check must be done only when preemption order is USERLIMIT_FIRST
- // 2. By selecting container "c", check whether this user's resource usage
- // is going below its user-limit.
- // 3. Used resource of user must be always greater than user-limit to
- // skip some containers as per this check. If used resource is under user
- // limit, then these containers of this user has to be preempted as demand
- // might be due to high priority apps running in same user.
- String partition = context.getScheduler()
- .getSchedulerNode(c.getAllocatedNode()).getPartition();
- TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(),
- partition);
- TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser());
-
- // Given user is not present, skip the check.
- if (tmpUser == null) {
- return false;
- }
-
- // For ideal resource computations, user-limit got saved by subtracting am
- // used resource in TempUser. Hence it has to be added back here for
- // complete check.
- Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed);
-
- return Resources.lessThanOrEqual(rc, clusterResource,
- Resources.subtract(usedResource, c.getAllocatedResource()), userLimit)
- && context.getIntraQueuePreemptionOrderPolicy()
- .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index e2f311f..2890414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -32,13 +31,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
-import java.util.Map.Entry;
import java.util.Set;
/**
@@ -54,14 +51,14 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
Comparator<TempAppPerPartition> {
@Override
- public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
- Priority p1 = Priority.newInstance(ta1.getPriority());
- Priority p2 = Priority.newInstance(ta2.getPriority());
+ public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+ Priority p1 = Priority.newInstance(tq1.getPriority());
+ Priority p2 = Priority.newInstance(tq2.getPriority());
if (!p1.equals(p2)) {
return p1.compareTo(p2);
}
- return ta1.getApplicationId().compareTo(ta2.getApplicationId());
+ return tq1.getApplicationId().compareTo(tq2.getApplicationId());
}
}
@@ -124,60 +121,37 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
.getResourceDemandFromAppsPerQueue(queueName, partition);
- // Default preemption iterator considers only FIFO+priority. For
- // userlimit preemption, its possible that some lower priority apps
- // needs from high priority app of another user. Hence use apps
- // ordered by userlimit starvation as well.
- Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
- .getPreemptableApps(queueName, partition);
-
- // 6. Get user-limit to ensure that we do not preempt resources which
- // will force user's resource to come under its UL.
- Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
- initializeUsageAndUserLimitForCompute(clusterResource, partition,
- leafQueue, rollingResourceUsagePerUser);
-
- // 7. Based on the selected resource demand per partition, select
+ // 6. Based on the selected resource demand per partition, select
// containers with known policy from inter-queue preemption.
try {
leafQueue.getReadLock().lock();
- for (FiCaSchedulerApp app : apps) {
- preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
- clusterResource, totalPreemptedResourceAllowed,
- resToObtainByPartition, rollingResourceUsagePerUser);
+ Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+ .getPreemptionIterator();
+ while (desc.hasNext()) {
+ FiCaSchedulerApp app = desc.next();
+ preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+ totalPreemptedResourceAllowed, resToObtainByPartition,
+ leafQueue, app);
}
} finally {
leafQueue.getReadLock().unlock();
}
}
}
- return selectedCandidates;
- }
- private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
- String partition, LeafQueue leafQueue,
- Map<String, Resource> rollingResourceUsagePerUser) {
- for (String user : leafQueue.getAllUsers()) {
- // Initialize used resource of a given user for rolling computation.
- rollingResourceUsagePerUser.put(user, Resources.clone(
- leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
- if (LOG.isDebugEnabled()) {
- LOG.debug("Rolling resource usage for user:" + user + " is : "
- + rollingResourceUsagePerUser.get(user));
- }
- }
+ return selectedCandidates;
}
- private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
- FiCaSchedulerApp app,
+ private void preemptFromLeastStarvedApp(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed,
- Map<String, Resource> resToObtainByPartition,
- Map<String, Resource> rollingResourceUsagePerUser) {
+ Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+ FiCaSchedulerApp app) {
// ToDo: Reuse reservation selector here.
- List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
+ List<RMContainer> liveContainers = new ArrayList<>(
+ app.getLiveContainers());
sortContainers(liveContainers);
if (LOG.isDebugEnabled()) {
@@ -186,8 +160,6 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
+ totalPreemptedResourceAllowed);
}
- Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
- .get(app.getUser());
for (RMContainer c : liveContainers) {
// if there are no demand, return.
@@ -212,34 +184,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
continue;
}
- // If selected container brings down resource usage under its user's
- // UserLimit (or equals to), we must skip such containers.
- if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
- clusterResource, rollingUsedResourcePerUser, c)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Skipping container: " + c.getContainerId() + " with resource:"
- + c.getAllocatedResource() + " as UserLimit for user:"
- + app.getUser() + " with resource usage: "
- + rollingUsedResourcePerUser + " is going under UL");
- }
- break;
- }
-
// Try to preempt this container
- boolean ret = CapacitySchedulerPreemptionUtils
- .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
- resToObtainByPartition, c, clusterResource, selectedCandidates,
- totalPreemptedResourceAllowed);
-
- // Subtract from respective user's resource usage once a container is
- // selected for preemption.
- if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
- .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
- Resources.subtractFrom(rollingUsedResourcePerUser,
- c.getAllocatedResource());
- }
+ CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+ rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+ selectedCandidates, totalPreemptedResourceAllowed);
}
+
}
private void computeIntraQueuePreemptionDemand(Resource clusterResource,
@@ -255,7 +205,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
continue;
}
- // 2. loop through all queues corresponding to a partition.
+ // 2. Its better to get partition based resource limit earlier before
+ // starting calculation
+ Resource partitionBasedResource =
+ context.getPartitionResource(partition);
+
+ // 3. loop through all queues corresponding to a partition.
for (String queueName : queueNames) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
@@ -266,22 +221,23 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
continue;
}
- // 3. Consider reassignableResource as (used - actuallyToBePreempted).
+ // 4. Consider reassignableResource as (used - actuallyToBePreempted).
// This provides as upper limit to split apps quota in a queue.
Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
tq.getActuallyToBePreempted());
- // 4. Check queue's used capacity. Make sure that the used capacity is
+ // 5. Check queue's used capacity. Make sure that the used capacity is
// above certain limit to consider for intra queue preemption.
if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
.getMinimumThresholdForIntraQueuePreemption()) {
continue;
}
- // 5. compute the allocation of all apps based on queue's unallocated
+ // 6. compute the allocation of all apps based on queue's unallocated
// capacity
fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
- tq, selectedCandidates, totalPreemptedResourceAllowed,
+ partitionBasedResource, tq, selectedCandidates,
+ totalPreemptedResourceAllowed,
queueReassignableResource,
context.getMaxAllowableLimitForIntraQueuePreemption());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
index 56fd007..93ebe65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
@@ -18,14 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-import java.util.Collection;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
interface IntraQueuePreemptionComputePlugin {
@@ -34,14 +32,8 @@ interface IntraQueuePreemptionComputePlugin {
String partition);
void computeAppsIdealAllocation(Resource clusterResource,
- TempQueuePerPartition tq,
+ Resource partitionBasedResource, TempQueuePerPartition tq,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
float maxAllowablePreemptLimit);
-
- Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
- String partition);
-
- boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
- Resource clusterResource, Resource usedResource, RMContainer c);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index b171b04..1e684ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -81,16 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
*/
public class ProportionalCapacityPreemptionPolicy
implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
-
- /**
- * IntraQueuePreemptionOrder will be used to define various priority orders
- * which could be configured by admin.
- */
- @Unstable
- public enum IntraQueuePreemptionOrderPolicy {
- PRIORITY_FIRST, USERLIMIT_FIRST;
- }
-
private static final Log LOG =
LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
@@ -107,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicy
private float maxAllowableLimitForIntraQueuePreemption;
private float minimumThresholdForIntraQueuePreemption;
- private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
// Pointer to other RM components
private RMContext rmContext;
@@ -203,13 +191,6 @@ public class ProportionalCapacityPreemptionPolicy
CapacitySchedulerConfiguration.
DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
- intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
- .valueOf(csConfig
- .get(
- CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
- .toUpperCase());
-
rc = scheduler.getResourceCalculator();
nlm = scheduler.getRMContext().getNodeLabelManager();
@@ -262,6 +243,7 @@ public class ProportionalCapacityPreemptionPolicy
}
}
+ @SuppressWarnings("unchecked")
private void preemptOrkillSelectedContainerAfterWait(
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
long currentTime) {
@@ -674,9 +656,4 @@ public class ProportionalCapacityPreemptionPolicy
}
underServedQueues.add(queueName);
}
-
- @Override
- public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
- return intraQueuePreemptionOrderPolicy;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
index cbc1028..fccd2a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -59,17 +59,13 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
@Override
public String toString() {
StringBuilder sb = new StringBuilder();
- sb.append("NAME: " + getApplicationId())
- .append(" PRIO: ").append(priority)
- .append(" CUR: ").append(getUsed())
- .append(" PEN: ").append(pending)
- .append(" RESERVED: ").append(reserved)
- .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
- .append(" PREEMPT_OTHER: ").append(getToBePreemptFromOther())
- .append(" IDEAL_PREEMPT: ").append(toBePreempted)
- .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
- .append(" SELECTED: ").append(selected)
- .append("\n");
+ sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+ .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+ .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+ .append(idealAssigned).append(" PREEMPT_OTHER: ")
+ .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+ .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+ .append(getActuallyToBePreempted()).append("\n");
return sb.toString();
}
@@ -95,12 +91,8 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
return applicationId;
}
- public String getUser() {
- return this.app.getUser();
- }
-
public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
- Resource cluster, Resource toBeDeduct) {
+ Resource cluster, Resource toBeDeduct, String partition) {
if (Resources.greaterThan(resourceCalculator, cluster,
getActuallyToBePreempted(), toBeDeduct)) {
Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 89452f9..7eab015 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
/**
* Temporary data-structure tracking resource availability, pending resource
@@ -61,10 +59,6 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
int relativePriority = 0;
TempQueuePerPartition parent = null;
- // This will hold a temp user data structure and will hold userlimit,
- // idealAssigned, used etc.
- Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
-
TempQueuePerPartition(String queueName, Resource current,
boolean preemptionDisabled, String partition, Resource killable,
float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@@ -295,12 +289,4 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
return apps;
}
- public void addUserPerPartition(String userName,
- TempUserPerPartition tmpUser) {
- this.usersPerPartition.put(userName, tmpUser);
- }
-
- public Map<String, TempUserPerPartition> getUsersPerPartition() {
- return usersPerPartition;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
deleted file mode 100644
index 245b5d4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-
-/**
- * Temporary data-structure tracking resource availability, pending resource
- * need, current utilization for an application.
- */
-public class TempUserPerPartition extends AbstractPreemptionEntity {
-
- private final User user;
- private Resource userLimit;
- private boolean donePreemptionQuotaForULDelta = false;
-
- TempUserPerPartition(User user, String queueName, Resource usedPerPartition,
- Resource amUsedPerPartition, Resource reserved,
- Resource pendingPerPartition) {
- super(queueName, usedPerPartition, amUsedPerPartition, reserved,
- pendingPerPartition);
- this.user = user;
- }
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed())
- .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
- .append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ")
- .append(getUserLimit()).append(" IDEAL_ASSIGNED: ")
- .append(idealAssigned).append(" USED_WO_AMUSED: ")
- .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ")
- .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
- .append(getActuallyToBePreempted()).append("\n");
-
- return sb.toString();
- }
-
- public String getUserName() {
- return user.getUserName();
- }
-
- public Resource getUserLimit() {
- return userLimit;
- }
-
- public void setUserLimit(Resource userLimitResource) {
- this.userLimit = userLimitResource;
- }
-
- public boolean isUserLimitReached(ResourceCalculator rc,
- Resource clusterResource) {
- if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(),
- userLimit)) {
- return true;
- }
- return false;
- }
-
- public boolean isPreemptionQuotaForULDeltaDone() {
- return this.donePreemptionQuotaForULDelta;
- }
-
- public void updatePreemptionQuotaForULDeltaAsDone(boolean done) {
- this.donePreemptionQuotaForULDelta = done;
- }
-}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 026dd82..9fb92ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1233,14 +1233,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
0.2f;
- /**
- * For intra-queue preemption, enforce a preemption order such as
- * "userlimit_first" or "priority_first".
- */
- public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
- + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
- public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
-
/**
* Maximum application for a queue to be used when application per queue is
* not defined.To be consistent with previous version the default value is set
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 71225b8..2b1efd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -486,7 +486,7 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
User u = users.get(userName);
if (null == u) {
- u = new User(userName);
+ u = new User();
users.put(userName, u);
}
return u;
@@ -1292,7 +1292,7 @@ public class LeafQueue extends AbstractCSQueue {
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
computeUserLimit(application.getUser(), clusterResource, user,
- partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+ partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
}
@@ -1366,7 +1366,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, need consider headroom respect labels also
Resource userLimit =
computeUserLimit(application.getUser(), clusterResource, queueUser,
- nodePartition, schedulingMode, true);
+ nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@@ -1410,7 +1410,7 @@ public class LeafQueue extends AbstractCSQueue {
@Lock(NoLock.class)
private Resource computeUserLimit(String userName,
Resource clusterResource, User user,
- String nodePartition, SchedulingMode schedulingMode, boolean forActive) {
+ String nodePartition, SchedulingMode schedulingMode) {
Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
clusterResource);
@@ -1462,21 +1462,16 @@ public class LeafQueue extends AbstractCSQueue {
// queue's configured capacity * user-limit-factor.
// Also, the queue's configured capacity should be higher than
// queue-hard-limit * ulMin
-
- final int usersCount;
- if (forActive) {
- usersCount = activeUsersManager.getNumActiveUsers();
- } else {
- usersCount = users.size();
- }
-
+
+ final int activeUsers = activeUsersManager.getNumActiveUsers();
+
// User limit resource is determined by:
// max{currentCapacity / #activeUsers, currentCapacity *
// user-limit-percentage%)
Resource userLimitResource = Resources.max(
resourceCalculator, partitionResource,
Resources.divideAndCeil(
- resourceCalculator, currentCapacity, usersCount),
+ resourceCalculator, currentCapacity, activeUsers),
Resources.divideAndCeil(
resourceCalculator,
Resources.multiplyAndRoundDown(
@@ -1524,16 +1519,14 @@ public class LeafQueue extends AbstractCSQueue {
" qconsumed: " + queueUsage.getUsed() +
" consumedRatio: " + totalUserConsumedRatio +
" currentCapacity: " + currentCapacity +
- " activeUsers: " + usersCount +
+ " activeUsers: " + activeUsers +
" clusterCapacity: " + clusterResource +
" resourceByLabel: " + partitionResource +
" usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
" Partition: " + nodePartition
);
}
- if (forActive) {
- user.setUserResourceLimit(userLimitResource);
- }
+ user.setUserResourceLimit(userLimitResource);
return userLimitResource;
}
@@ -1962,14 +1955,11 @@ public class LeafQueue extends AbstractCSQueue {
volatile int activeApplications = 0;
private UsageRatios userUsageRatios = new UsageRatios();
private WriteLock writeLock;
- String userName;
- @VisibleForTesting
- public User(String name) {
+ User() {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
// Nobody uses read-lock now, will add it when necessary
writeLock = lock.writeLock();
- this.userName = name;
}
public ResourceUsage getResourceUsage() {
@@ -2083,15 +2073,6 @@ public class LeafQueue extends AbstractCSQueue {
public void setUserResourceLimit(Resource userResourceLimit) {
this.userResourceLimit = userResourceLimit;
}
-
- public String getUserName() {
- return this.userName;
- }
-
- @VisibleForTesting
- public void setResourceUsage(ResourceUsage resourceUsage) {
- this.userResourceUsage = resourceUsage;
- }
}
@Override
@@ -2177,7 +2158,7 @@ public class LeafQueue extends AbstractCSQueue {
User user = getUser(userName);
Resource headroom = Resources.subtract(
computeUserLimit(app.getUser(), clusterResources, user, partition,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
@@ -2214,7 +2195,7 @@ public class LeafQueue extends AbstractCSQueue {
User user = getUser(userName);
return computeUserLimit(userName, resources, user, partition,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true);
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
@Override
@@ -2396,26 +2377,4 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.unlock();
}
}
-
- /**
- * Get all valid users in this queue.
- * @return user list
- */
- public Set<String> getAllUsers() {
- return this.users.keySet();
- }
-
- public Resource getResourceLimitForActiveUsers(String userName,
- Resource clusterResource, String partition,
- SchedulingMode schedulingMode) {
- return computeUserLimit(userName, clusterResource, getUser(userName),
- partition, schedulingMode, true);
- }
-
- public synchronized Resource getResourceLimitForAllUsers(String userName,
- Resource clusterResource, String partition, SchedulingMode schedulingMode)
- {
- return computeUserLimit(userName, clusterResource, getUser(userName),
- partition, schedulingMode, false);
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index faac129..32b2c68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -42,10 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -71,6 +69,7 @@ import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -97,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
Clock mClock = null;
CapacitySchedulerConfiguration conf = null;
CapacityScheduler cs = null;
- @SuppressWarnings("rawtypes")
EventHandler<SchedulerEvent> mDisp = null;
ProportionalCapacityPreemptionPolicy policy = null;
Resource clusterResource = null;
@@ -249,7 +247,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
if (containerId == 1) {
when(rmc.isAMContainer()).thenReturn(true);
when(app.getAMResource(label)).thenReturn(res);
- when(app.getAppAMNodePartitionName()).thenReturn(label);
}
if (reserved) {
@@ -283,12 +280,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
containerId++;
}
- // If app has 0 container, and it has only pending, still make sure to
- // update label.
- if (repeat == 0) {
- when(app.getAppAMNodePartitionName()).thenReturn(label);
- }
-
// Some more app specific aggregated data can be better filled here.
when(app.getPriority()).thenReturn(pri);
when(app.getUser()).thenReturn(userName);
@@ -324,15 +315,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
private void mockApplications(String appsConfig) {
int id = 1;
HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
- HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
LeafQueue queue = null;
- int mulp = -1;
for (String a : appsConfig.split(";")) {
String[] strs = a.split("\t");
String queueName = strs[0];
- if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
- mulp = 100 / (new Integer(strs[2]).intValue());
- }
// get containers
List<RMContainer> liveContainers = new ArrayList<RMContainer>();
@@ -352,7 +338,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
when(app.getReservedContainers()).thenReturn(reservedContainers);
when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
when(app.getApplicationId()).thenReturn(appId);
- when(app.getQueueName()).thenReturn(queueName);
// add to LeafQueue
queue = (LeafQueue) nameToCSQueues.get(queueName);
@@ -366,70 +351,20 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
}
users.add(app.getUser());
-
- String label = app.getAppAMNodePartitionName();
-
- // Get label to queue
- HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
- .get(label);
- if (null == userResourceUsagePerQueue) {
- userResourceUsagePerQueue = new HashMap<>();
- userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
- }
-
- // Get queue to user based resource map
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
- .get(queueName);
- if (null == userResourceUsage) {
- userResourceUsage = new HashMap<>();
- userResourceUsagePerQueue.put(queueName, userResourceUsage);
- }
-
- // Get user to its resource usage.
- ResourceUsage usage = userResourceUsage.get(app.getUser());
- if (null == usage) {
- usage = new ResourceUsage();
- userResourceUsage.put(app.getUser(), usage);
- }
-
- usage.incAMUsed(app.getAMResource(label));
- usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
id++;
}
- for (String label : userResourceUsagePerLabel.keySet()) {
- for (String queueName : userMap.keySet()) {
- queue = (LeafQueue) nameToCSQueues.get(queueName);
- // Currently we have user-limit test support only for default label.
- Resource totResoucePerPartition = partitionToResource.get("");
- Resource capacity = Resources.multiply(totResoucePerPartition,
- queue.getQueueCapacities().getAbsoluteCapacity());
- HashSet<String> users = userMap.get(queue.getQueueName());
- when(queue.getAllUsers()).thenReturn(users);
- Resource userLimit;
- if (mulp > 0) {
- userLimit = Resources.divideAndCeil(rc, capacity, mulp);
- } else {
- userLimit = Resources.divideAndCeil(rc, capacity,
- users.size());
- }
- LOG.debug("Updating user-limit from mock: totResoucePerPartition="
- + totResoucePerPartition + ", capacity=" + capacity
- + ", users.size()=" + users.size() + ", userlimit= " + userLimit
- + ",label= " + label + ",queueName= " + queueName);
-
- HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
- .get(label).get(queueName);
- for (String userName : users) {
- User user = new User(userName);
- if (userResourceUsage != null) {
- user.setResourceUsage(userResourceUsage.get(userName));
- }
- when(queue.getUser(eq(userName))).thenReturn(user);
- when(queue.getResourceLimitForAllUsers(eq(userName),
- any(Resource.class), anyString(), any(SchedulingMode.class)))
- .thenReturn(userLimit);
- }
+ for (String queueName : userMap.keySet()) {
+ queue = (LeafQueue) nameToCSQueues.get(queueName);
+ // Currently we have user-limit test support only for default label.
+ Resource totResoucePerPartition = partitionToResource.get("");
+ Resource capacity = Resources.multiply(totResoucePerPartition,
+ queue.getQueueCapacities().getAbsoluteCapacity());
+ HashSet<String> users = userMap.get(queue.getQueueName());
+ Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+ for (String user : users) {
+ when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+ anyString())).thenReturn(userLimit);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
index 6c5aa67..bf83e1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -62,16 +62,12 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
* Apps which are running at low priority (4) will preempt few of its
* resources to meet the demand.
*/
-
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
-
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
"n1= res=100";
String queuesConfig =
// guaranteed,max,used,pending,reserved
- "root(=[100 100 79 120 0]);" + // root
+ "root(=[100 100 80 120 0]);" + // root
"-a(=[11 100 11 50 0]);" + // a
"-b(=[40 100 38 60 0]);" + // b
"-c(=[20 100 10 10 0]);" + // c
@@ -308,8 +304,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -363,8 +357,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
// report "ideal" preempt as 10%. Ensure preemption happens only for 10%
conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
(float) 0.1);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -419,8 +411,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -428,7 +418,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
String queuesConfig =
// guaranteed,max,used,pending,reserved
"root(=[100 100 95 170 0]);" + // root
- "-a(=[60 100 70 35 0]);" + // a
+ "-a(=[60 100 70 50 0]);" + // a
"-b(=[40 100 25 120 0])"; // b
String appsConfig =
@@ -477,8 +467,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -528,8 +516,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
* cycle. Eventhough there are more demand and no other low priority
* apps are present, still AM contaier need to soared.
*/
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -674,8 +660,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
@@ -736,8 +720,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;";
String nodesConfig = // n1 has no label
@@ -858,10 +840,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
policy.editSchedule();
// Considering user-limit of 50% since only 2 users are there, only preempt
- // 14 more (5 is already running) eventhough demand is for 30. Ideally we
- // must preempt 15. But 15th container will bring user1's usage to 20 which
- // is same as user-limit. Hence skip 15th container.
- verify(mDisp, times(14)).handle(argThat(
+ // 15 more (5 is already running) eventhough demand is for 30.
+ verify(mDisp, times(15)).handle(argThat(
new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
getAppAttemptId(3))));
}
@@ -889,8 +869,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
conf.setFloat(CapacitySchedulerConfiguration.
INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
(float) 0.5);
- conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
- "priority_first");
String labelsConfig = "=100,true;" + // default partition
"x=100,true"; // partition=x
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org