You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2015/09/04 04:48:45 UTC
[1/4] hadoop git commit: YARN-2637. Fixed max-am-resource-percent
calculation in CapacityScheduler when activating applications. Contributed by
Craig Welch (cherry picked from commit
c53420f58364b11fbda1dace7679d45534533382)
Repository: hadoop
Updated Branches:
refs/heads/branch-2.6.1 3bd9b7459 -> 2073fc0f8
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
index b4c4c10..3918bf7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
@@ -143,13 +143,14 @@ public class TestFifoScheduler {
@Test(timeout=5000)
public void testAppAttemptMetrics() throws Exception {
AsyncDispatcher dispatcher = new InlineDispatcher();
+
+ FifoScheduler scheduler = new FifoScheduler();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
RMContext rmContext = new RMContextImpl(dispatcher, null,
- null, null, null, null, null, null, null, writer);
+ null, null, null, null, null, null, null, writer, scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
- FifoScheduler scheduler = new FifoScheduler();
Configuration conf = new Configuration();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
@@ -189,12 +190,14 @@ public class TestFifoScheduler {
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
+
+ FifoScheduler scheduler = new FifoScheduler();
RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+ scheduler);
((RMContextImpl) rmContext).setSystemMetricsPublisher(
mock(SystemMetricsPublisher.class));
- FifoScheduler scheduler = new FifoScheduler();
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
@@ -260,17 +263,19 @@ public class TestFifoScheduler {
new NMTokenSecretManagerInRM(conf);
nmTokenSecretManager.rollMasterKey();
RMApplicationHistoryWriter writer = mock(RMApplicationHistoryWriter.class);
- RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
- null, containerTokenSecretManager, nmTokenSecretManager, null, writer);
- ((RMContextImpl) rmContext).setSystemMetricsPublisher(
- mock(SystemMetricsPublisher.class));
-
+
FifoScheduler scheduler = new FifoScheduler(){
@SuppressWarnings("unused")
public Map<NodeId, FiCaSchedulerNode> getNodes(){
return nodes;
}
};
+ RMContext rmContext = new RMContextImpl(dispatcher, null, null, null, null,
+ null, containerTokenSecretManager, nmTokenSecretManager, null, writer,
+ scheduler);
+ ((RMContextImpl) rmContext).setSystemMetricsPublisher(
+ mock(SystemMetricsPublisher.class));
+
scheduler.setRMContext(rmContext);
scheduler.init(conf);
scheduler.start();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
index 87bacc6..d418dab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySched.java
@@ -82,8 +82,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
int numContainers;
int maxApplications;
int maxApplicationsPerUser;
- int maxActiveApplications;
- int maxActiveApplicationsPerUser;
int userLimit;
float userLimitFactor;
}
@@ -303,10 +301,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
lqi.maxApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
- lqi.maxActiveApplications =
- WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplications");
- lqi.maxActiveApplicationsPerUser =
- WebServicesTestUtils.getXmlInt(qElem, "maxActiveApplicationsPerUser");
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit");
lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
@@ -386,8 +380,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
lqi.numContainers = info.getInt("numContainers");
lqi.maxApplications = info.getInt("maxApplications");
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
- lqi.maxActiveApplications = info.getInt("maxActiveApplications");
- lqi.maxActiveApplicationsPerUser = info.getInt("maxActiveApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
verifyLeafQueueGeneric(q, lqi);
@@ -449,10 +441,6 @@ public class TestRMWebServicesCapacitySched extends JerseyTest {
(float)expectedMaxAppsPerUser,
(float)info.maxApplicationsPerUser, info.userLimitFactor);
- assertTrue("maxActiveApplications doesn't match",
- info.maxActiveApplications > 0);
- assertTrue("maxActiveApplicationsPerUser doesn't match",
- info.maxActiveApplicationsPerUser > 0);
assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
info.userLimit);
assertEquals("userLimitFactor doesn't match",
[3/4] hadoop git commit: YARN-3733. Fix DominantRC#compare() does not
work as expected if cluster resource is empty. (Rohith Sharmaks via wangda)
Posted by vi...@apache.org.
YARN-3733. Fix DominantRC#compare() does not work as expected if cluster resource is empty. (Rohith Sharmaks via wangda)
(cherry picked from commit ebd797c48fe236b404cf3a125ac9d1f7714e291e)
(cherry picked from commit 78d626fa892415023827e35ad549636e2a83275d)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/85d92721
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/85d92721
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/85d92721
Branch: refs/heads/branch-2.6.1
Commit: 85d92721a42fdc50cc828dfe28103ecee9831924
Parents: f1b35ff
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Jun 4 10:22:57 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 17:43:01 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../resource/DominantResourceCalculator.java | 15 +++++
.../capacity/TestCapacityScheduler.java | 58 +++++++++++++++++++-
3 files changed, 75 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85d92721/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index a5b270e..8244d61 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -168,6 +168,9 @@ Release 2.6.1 - UNRELEASED
YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler
when activating applications. (Craig Welch via jianhe)
+ YARN-3733. Fix DominantRC#compare() does not work as expected if
+ cluster resource is empty. (Rohith Sharmaks via wangda)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85d92721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 6f5b40e..2ee95ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -53,6 +53,21 @@ public class DominantResourceCalculator extends ResourceCalculator {
return 0;
}
+ if (isInvalidDivisor(clusterResource)) {
+ if ((lhs.getMemory() < rhs.getMemory() && lhs.getVirtualCores() > rhs
+ .getVirtualCores())
+ || (lhs.getMemory() > rhs.getMemory() && lhs.getVirtualCores() < rhs
+ .getVirtualCores())) {
+ return 0;
+ } else if (lhs.getMemory() > rhs.getMemory()
+ || lhs.getVirtualCores() > rhs.getVirtualCores()) {
+ return 1;
+ } else if (lhs.getMemory() < rhs.getMemory()
+ || lhs.getVirtualCores() < rhs.getVirtualCores()) {
+ return -1;
+ }
+ }
+
float l = getResourceAsValue(clusterResource, lhs, true);
float r = getResourceAsValue(clusterResource, rhs, true);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/85d92721/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 2aa57a0..edcd871 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -118,6 +118,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedule
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfo;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.CapacitySchedulerQueueInfoList;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
+import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
@@ -1136,9 +1137,15 @@ public class TestCapacityScheduler {
private MockRM setUpMove() {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
+ return setUpMove(conf);
+ }
+
+ private MockRM setUpMove(Configuration config) {
+ CapacitySchedulerConfiguration conf =
+ new CapacitySchedulerConfiguration(config);
setupQueueConfiguration(conf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
+ ResourceScheduler.class);
MockRM rm = new MockRM(conf);
rm.start();
return rm;
@@ -2071,4 +2078,53 @@ public class TestCapacityScheduler {
Assert.assertEquals(0, report.getNumReservedContainers());
rm.stop();
}
+
+ @Test(timeout = 30000)
+ public void testAMLimitUsage() throws Exception {
+
+ CapacitySchedulerConfiguration config =
+ new CapacitySchedulerConfiguration();
+
+ config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DefaultResourceCalculator.class.getName());
+ verifyAMLimitForLeafQueue(config);
+
+ config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
+ DominantResourceCalculator.class.getName());
+ verifyAMLimitForLeafQueue(config);
+
+ }
+
+ private void verifyAMLimitForLeafQueue(CapacitySchedulerConfiguration config)
+ throws Exception {
+ MockRM rm = setUpMove(config);
+
+ String queueName = "a1";
+ String userName = "user_0";
+ ResourceScheduler scheduler = rm.getRMContext().getScheduler();
+ LeafQueue queueA =
+ (LeafQueue) ((CapacityScheduler) scheduler).getQueue(queueName);
+ Resource amResourceLimit = queueA.getAMResourceLimit();
+
+ Resource amResource =
+ Resource.newInstance(amResourceLimit.getMemory() + 1,
+ amResourceLimit.getVirtualCores() + 1);
+
+ rm.submitApp(amResource.getMemory(), "app-1", userName, null, queueName);
+
+ rm.submitApp(amResource.getMemory(), "app-1", userName, null, queueName);
+
+ // When AM limit is exceeded, 1 applications will be activated.Rest all
+ // applications will be in pending
+ Assert.assertEquals("PendingApplications should be 1", 1,
+ queueA.getNumPendingApplications());
+ Assert.assertEquals("Active applications should be 1", 1,
+ queueA.getNumActiveApplications());
+
+ Assert.assertEquals("User PendingApplications should be 1", 1, queueA
+ .getUser(userName).getPendingApplications());
+ Assert.assertEquals("User Active applications should be 1", 1, queueA
+ .getUser(userName).getActiveApplications());
+ rm.stop();
+ }
}
[4/4] hadoop git commit: Add missing test file of YARN-3733
Posted by vi...@apache.org.
Add missing test file of YARN-3733
(cherry picked from commit 405bbcf68c32d8fd8a83e46e686eacd14e5a533c)
(cherry picked from commit 344b7509153cdd993218cd5104c7e5c07cd35d3c)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/2073fc0f
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/2073fc0f
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/2073fc0f
Branch: refs/heads/branch-2.6.1
Commit: 2073fc0f84c75434f023a1145d2f14289a3f5ef1
Parents: 85d9272
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Jun 4 13:18:25 2015 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 17:43:03 2015 -0700
----------------------------------------------------------------------
.../util/resource/TestResourceCalculator.java | 125 +++++++++++++++++++
1 file changed, 125 insertions(+)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/2073fc0f/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
new file mode 100644
index 0000000..6a0b62e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/resource/TestResourceCalculator.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.util.resource;
+
+import java.util.Arrays;
+import java.util.Collection;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.junit.Assert;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+@RunWith(Parameterized.class)
+public class TestResourceCalculator {
+ private ResourceCalculator resourceCalculator;
+
+ @Parameterized.Parameters
+ public static Collection<ResourceCalculator[]> getParameters() {
+ return Arrays.asList(new ResourceCalculator[][] {
+ { new DefaultResourceCalculator() },
+ { new DominantResourceCalculator() } });
+ }
+
+ public TestResourceCalculator(ResourceCalculator rs) {
+ this.resourceCalculator = rs;
+ }
+
+ @Test(timeout = 10000)
+ public void testResourceCalculatorCompareMethod() {
+ Resource clusterResource = Resource.newInstance(0, 0);
+
+ // For lhs == rhs
+ Resource lhs = Resource.newInstance(0, 0);
+ Resource rhs = Resource.newInstance(0, 0);
+ assertResourcesOperations(clusterResource, lhs, rhs, false, true, false,
+ true, lhs, lhs);
+
+ // lhs > rhs
+ lhs = Resource.newInstance(1, 1);
+ rhs = Resource.newInstance(0, 0);
+ assertResourcesOperations(clusterResource, lhs, rhs, false, false, true,
+ true, lhs, rhs);
+
+ // For lhs < rhs
+ lhs = Resource.newInstance(0, 0);
+ rhs = Resource.newInstance(1, 1);
+ assertResourcesOperations(clusterResource, lhs, rhs, true, true, false,
+ false, rhs, lhs);
+
+ if (!(resourceCalculator instanceof DominantResourceCalculator)) {
+ return;
+ }
+
+ // verify for 2 dimensional resources i.e memory and cpu
+ // dominant resource types
+ lhs = Resource.newInstance(1, 0);
+ rhs = Resource.newInstance(0, 1);
+ assertResourcesOperations(clusterResource, lhs, rhs, false, true, false,
+ true, lhs, lhs);
+
+ lhs = Resource.newInstance(0, 1);
+ rhs = Resource.newInstance(1, 0);
+ assertResourcesOperations(clusterResource, lhs, rhs, false, true, false,
+ true, lhs, lhs);
+
+ lhs = Resource.newInstance(1, 1);
+ rhs = Resource.newInstance(1, 0);
+ assertResourcesOperations(clusterResource, lhs, rhs, false, false, true,
+ true, lhs, rhs);
+
+ lhs = Resource.newInstance(0, 1);
+ rhs = Resource.newInstance(1, 1);
+ assertResourcesOperations(clusterResource, lhs, rhs, true, true, false,
+ false, rhs, lhs);
+
+ }
+
+
+ private void assertResourcesOperations(Resource clusterResource,
+ Resource lhs, Resource rhs, boolean lessThan, boolean lessThanOrEqual,
+ boolean greaterThan, boolean greaterThanOrEqual, Resource max,
+ Resource min) {
+
+ Assert.assertEquals("Less Than operation is wrongly calculated.", lessThan,
+ Resources.lessThan(resourceCalculator, clusterResource, lhs, rhs));
+
+ Assert.assertEquals(
+ "Less Than Or Equal To operation is wrongly calculated.",
+ lessThanOrEqual, Resources.lessThanOrEqual(resourceCalculator,
+ clusterResource, lhs, rhs));
+
+ Assert.assertEquals("Greater Than operation is wrongly calculated.",
+ greaterThan,
+ Resources.greaterThan(resourceCalculator, clusterResource, lhs, rhs));
+
+ Assert.assertEquals(
+ "Greater Than Or Equal To operation is wrongly calculated.",
+ greaterThanOrEqual, Resources.greaterThanOrEqual(resourceCalculator,
+ clusterResource, lhs, rhs));
+
+ Assert.assertEquals("Max(value) Operation wrongly calculated.", max,
+ Resources.max(resourceCalculator, clusterResource, lhs, rhs));
+
+ Assert.assertEquals("Min(value) operation is wrongly calculated.", min,
+ Resources.min(resourceCalculator, clusterResource, lhs, rhs));
+ }
+
+}
\ No newline at end of file
[2/4] hadoop git commit: YARN-2637. Fixed max-am-resource-percent
calculation in CapacityScheduler when activating applications. Contributed by
Craig Welch (cherry picked from commit
c53420f58364b11fbda1dace7679d45534533382)
Posted by vi...@apache.org.
YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler when activating applications. Contributed by Craig Welch
(cherry picked from commit c53420f58364b11fbda1dace7679d45534533382)
(cherry picked from commit 4931600030e13d9332d9a0e588487cb8684c667d)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f1b35ffd
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f1b35ffd
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f1b35ffd
Branch: refs/heads/branch-2.6.1
Commit: f1b35ffd4ca680d76bf82b541357b8e5748f129e
Parents: 3bd9b74
Author: Jian He <ji...@apache.org>
Authored: Tue Jan 13 17:32:07 2015 -0800
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Thu Sep 3 17:40:24 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../dev-support/findbugs-exclude.xml | 1 +
.../resourcemanager/RMActiveServiceContext.java | 4 +-
.../server/resourcemanager/RMContextImpl.java | 32 ++-
.../server/resourcemanager/rmapp/RMApp.java | 3 +
.../server/resourcemanager/rmapp/RMAppImpl.java | 5 +
.../scheduler/capacity/CSQueueUtils.java | 24 --
.../scheduler/capacity/LeafQueue.java | 252 ++++++++++++------
.../scheduler/common/fica/FiCaSchedulerApp.java | 15 ++
.../webapp/CapacitySchedulerPage.java | 4 +-
.../dao/CapacitySchedulerLeafQueueInfo.java | 24 +-
.../applicationsmanager/MockAsm.java | 7 +
.../TestAMRMRPCNodeUpdates.java | 9 +
.../TestCapacitySchedulerPlanFollower.java | 1 +
.../server/resourcemanager/rmapp/MockRMApp.java | 7 +
.../capacity/TestApplicationLimits.java | 256 +++++++++++++------
.../scheduler/capacity/TestLeafQueue.java | 79 +++---
.../scheduler/capacity/TestReservations.java | 34 +--
.../scheduler/fifo/TestFifoScheduler.java | 23 +-
.../webapp/TestRMWebServicesCapacitySched.java | 12 -
20 files changed, 519 insertions(+), 276 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 60ae3d0..a5b270e 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -165,6 +165,9 @@ Release 2.6.1 - UNRELEASED
YARN-3990. AsyncDispatcher may overloaded with RMAppNodeUpdateEvent when
Node is connected/disconnected (Bibin A Chundatt via jlowe)
+ YARN-2637. Fixed max-am-resource-percent calculation in CapacityScheduler
+ when activating applications. (Craig Welch via jianhe)
+
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 45d7294..971acea 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -173,6 +173,7 @@
<Field name="userLimit" />
<Field name="userLimitFactor" />
<Field name="maxAMResourcePerQueuePercent" />
+ <Field name="lastClusterResource" />
</Or>
<Bug pattern="IS2_INCONSISTENT_SYNC" />
</Match>
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
index 3bc2e9b..03fc40e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMActiveServiceContext.java
@@ -117,7 +117,8 @@ public class RMActiveServiceContext {
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ RMApplicationHistoryWriter rmApplicationHistoryWriter,
+ ResourceScheduler scheduler) {
this();
this.setContainerAllocationExpirer(containerAllocationExpirer);
this.setAMLivelinessMonitor(amLivelinessMonitor);
@@ -128,6 +129,7 @@ public class RMActiveServiceContext {
this.setNMTokenSecretManager(nmTokenSecretManager);
this.setClientToAMTokenSecretManager(clientToAMTokenSecretManager);
this.setRMApplicationHistoryWriter(rmApplicationHistoryWriter);
+ this.setScheduler(scheduler);
RMStateStore nullStore = new NullRMStateStore();
nullStore.setRMDispatcher(rmDispatcher);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 55d7667..32216e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -84,18 +84,46 @@ public class RMContextImpl implements RMContext {
RMContainerTokenSecretManager containerTokenSecretManager,
NMTokenSecretManagerInRM nmTokenSecretManager,
ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
- RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ RMApplicationHistoryWriter rmApplicationHistoryWriter,
+ ResourceScheduler scheduler) {
this();
this.setDispatcher(rmDispatcher);
setActiveServiceContext(new RMActiveServiceContext(rmDispatcher,
containerAllocationExpirer, amLivelinessMonitor, amFinishingMonitor,
delegationTokenRenewer, appTokenSecretManager,
containerTokenSecretManager, nmTokenSecretManager,
- clientToAMTokenSecretManager, rmApplicationHistoryWriter));
+ clientToAMTokenSecretManager, rmApplicationHistoryWriter,
+ scheduler));
ConfigurationProvider provider = new LocalConfigurationProvider();
setConfigurationProvider(provider);
}
+
+ @VisibleForTesting
+ // helper constructor for tests
+ public RMContextImpl(Dispatcher rmDispatcher,
+ ContainerAllocationExpirer containerAllocationExpirer,
+ AMLivelinessMonitor amLivelinessMonitor,
+ AMLivelinessMonitor amFinishingMonitor,
+ DelegationTokenRenewer delegationTokenRenewer,
+ AMRMTokenSecretManager appTokenSecretManager,
+ RMContainerTokenSecretManager containerTokenSecretManager,
+ NMTokenSecretManagerInRM nmTokenSecretManager,
+ ClientToAMTokenSecretManagerInRM clientToAMTokenSecretManager,
+ RMApplicationHistoryWriter rmApplicationHistoryWriter) {
+ this(
+ rmDispatcher,
+ containerAllocationExpirer,
+ amLivelinessMonitor,
+ amFinishingMonitor,
+ delegationTokenRenewer,
+ appTokenSecretManager,
+ containerTokenSecretManager,
+ nmTokenSecretManager,
+ clientToAMTokenSecretManager,
+ rmApplicationHistoryWriter,
+ null);
+ }
@Override
public Dispatcher getDispatcher() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
index 624aa18..fbcaab9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMApp.java
@@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.event.EventHandler;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
@@ -239,4 +240,6 @@ public interface RMApp extends EventHandler<RMAppEvent> {
RMAppMetrics getRMAppMetrics();
ReservationId getReservationId();
+
+ ResourceRequest getAMResourceRequest();
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index 751dbe4..19f2193 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -1337,6 +1337,11 @@ public class RMAppImpl implements RMApp, Recoverable {
public ReservationId getReservationId() {
return submissionContext.getReservationID();
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
protected Credentials parseCredentials() throws IOException {
Credentials credentials = new Credentials();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 0a2fa3a..f458057 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -109,30 +109,6 @@ class CSQueueUtils {
}
return absoluteMaxCapacityByNodeLabels;
}
-
- public static int computeMaxActiveApplications(
- ResourceCalculator calculator,
- Resource clusterResource, Resource minimumAllocation,
- float maxAMResourcePercent, float absoluteMaxCapacity) {
- return
- Math.max(
- (int)Math.ceil(
- Resources.ratio(
- calculator,
- clusterResource,
- minimumAllocation) *
- maxAMResourcePercent * absoluteMaxCapacity
- ),
- 1);
- }
-
- public static int computeMaxActiveApplicationsPerUser(
- int maxActiveApplications, int userLimit, float userLimitFactor) {
- return Math.max(
- (int)Math.ceil(
- maxActiveApplications * (userLimit / 100.0f) * userLimitFactor),
- 1);
- }
@Lock(CSQueue.class)
public static void updateQueueStatistics(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 6ffc61a..60b5a59 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -87,9 +87,6 @@ public class LeafQueue extends AbstractCSQueue {
protected int maxApplicationsPerUser;
private float maxAMResourcePerQueuePercent;
- private int maxActiveApplications; // Based on absolute max capacity
- private int maxActiveAppsUsingAbsCap; // Based on absolute capacity
- private int maxActiveApplicationsPerUser;
private int nodeLocalityDelay;
@@ -113,9 +110,16 @@ public class LeafQueue extends AbstractCSQueue {
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
+ // absolute capacity as a resource (based on cluster resource)
+ private Resource absoluteCapacityResource = Resources.none();
+
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
private volatile float absoluteMaxAvailCapacity;
+
+ // sum of resources used by application masters for applications
+ // running in this queue
+ private final Resource usedAMResources = Resource.newInstance(0, 0);
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -155,19 +159,6 @@ public class LeafQueue extends AbstractCSQueue {
float maxAMResourcePerQueuePercent = cs.getConfiguration()
.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
- int maxActiveApplications =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- cs.getClusterResource(), this.minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteMaxCapacity);
- this.maxActiveAppsUsingAbsCap =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- cs.getClusterResource(), this.minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteCapacity);
- int maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(
- maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
@@ -179,8 +170,7 @@ public class LeafQueue extends AbstractCSQueue {
setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
- maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
- .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+ state, acls, cs.getConfiguration().getNodeLocalityDelay(), accessibleLabels,
defaultLabelExpression, this.capacitiyByNodeLabels,
this.maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
@@ -208,8 +198,7 @@ public class LeafQueue extends AbstractCSQueue {
float maximumCapacity, float absoluteMaxCapacity,
int userLimit, float userLimitFactor,
int maxApplications, float maxAMResourcePerQueuePercent,
- int maxApplicationsPerUser, int maxActiveApplications,
- int maxActiveApplicationsPerUser, QueueState state,
+ int maxApplicationsPerUser, QueueState state,
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
Set<String> labels, String defaultLabelExpression,
Map<String, Float> capacitieByLabel,
@@ -224,6 +213,16 @@ public class LeafQueue extends AbstractCSQueue {
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
absoluteMaxCapacity);
+
+ this.lastClusterResource = clusterResource;
+ updateAbsoluteCapacityResource(clusterResource);
+
+ // Initialize headroom info, also used for calculating application
+ // master resource limits. Since this happens during queue initialization
+ // and all queues may not be realized yet, we'll use (optimistic)
+ // absoluteMaxCapacity (it will be replaced with the more accurate
+ // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
+ updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
this.absoluteCapacity = absCapacity;
@@ -234,9 +233,6 @@ public class LeafQueue extends AbstractCSQueue {
this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
this.maxApplicationsPerUser = maxApplicationsPerUser;
- this.maxActiveApplications = maxActiveApplications;
- this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
this.defaultLabelExpression)) {
throw new IOException("Invalid default label expression of "
@@ -288,21 +284,6 @@ public class LeafQueue extends AbstractCSQueue {
"maxApplicationsPerUser = " + maxApplicationsPerUser +
" [= (int)(maxApplications * (userLimit / 100.0f) * " +
"userLimitFactor) ]" + "\n" +
- "maxActiveApplications = " + maxActiveApplications +
- " [= max(" +
- "(int)ceil((clusterResourceMemory / minimumAllocation) * " +
- "maxAMResourcePerQueuePercent * absoluteMaxCapacity)," +
- "1) ]" + "\n" +
- "maxActiveAppsUsingAbsCap = " + maxActiveAppsUsingAbsCap +
- " [= max(" +
- "(int)ceil((clusterResourceMemory / minimumAllocation) *" +
- "maxAMResourcePercent * absoluteCapacity)," +
- "1) ]" + "\n" +
- "maxActiveApplicationsPerUser = " + maxActiveApplicationsPerUser +
- " [= max(" +
- "(int)(maxActiveApplications * (userLimit / 100.0f) * " +
- "userLimitFactor)," +
- "1) ]" + "\n" +
"usedCapacity = " + usedCapacity +
" [= usedResourcesMemory / " +
"(clusterResourceMemory * absoluteCapacity)]" + "\n" +
@@ -355,14 +336,6 @@ public class LeafQueue extends AbstractCSQueue {
return maxApplicationsPerUser;
}
- public synchronized int getMaximumActiveApplications() {
- return maxActiveApplications;
- }
-
- public synchronized int getMaximumActiveApplicationsPerUser() {
- return maxActiveApplicationsPerUser;
- }
-
@Override
public ActiveUsersManager getActiveUsersManager() {
return activeUsersManager;
@@ -525,8 +498,6 @@ public class LeafQueue extends AbstractCSQueue {
newlyParsedLeafQueue.maxApplications,
newlyParsedLeafQueue.maxAMResourcePerQueuePercent,
newlyParsedLeafQueue.getMaxApplicationsPerUser(),
- newlyParsedLeafQueue.getMaximumActiveApplications(),
- newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay(),
newlyParsedLeafQueue.accessibleLabels,
@@ -612,27 +583,115 @@ public class LeafQueue extends AbstractCSQueue {
}
}
+
+ public synchronized Resource getAMResourceLimit() {
+ /*
+ * The limit to the amount of resources which can be consumed by
+ * application masters for applications running in the queue
+ * is calculated by taking the greater of the max resources currently
+ * available to the queue (see absoluteMaxAvailCapacity) and the absolute
+ * resources guaranteed for the queue and multiplying it by the am
+ * resource percent.
+ *
+ * This is to allow a queue to grow its (proportional) application
+ * master resource use up to its max capacity when other queues are
+ * idle but to scale back down to it's guaranteed capacity as they
+ * become busy.
+ *
+ */
+ Resource queueMaxCap;
+ synchronized (queueHeadroomInfo) {
+ queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+ }
+ Resource queueCap = Resources.max(resourceCalculator, lastClusterResource,
+ absoluteCapacityResource, queueMaxCap);
+ return Resources.multiplyAndNormalizeUp(
+ resourceCalculator,
+ queueCap,
+ maxAMResourcePerQueuePercent, minimumAllocation);
+ }
+
+ public synchronized Resource getUserAMResourceLimit() {
+ /*
+ * The user amresource limit is based on the same approach as the
+ * user limit (as it should represent a subset of that). This means that
+ * it uses the absolute queue capacity instead of the max and is modified
+ * by the userlimit and the userlimit factor as is the userlimit
+ *
+ */
+ float effectiveUserLimit = Math.max(userLimit / 100.0f, 1.0f /
+ Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+
+ return Resources.multiplyAndNormalizeUp(
+ resourceCalculator,
+ absoluteCapacityResource,
+ maxAMResourcePerQueuePercent * effectiveUserLimit *
+ userLimitFactor, minimumAllocation);
+ }
private synchronized void activateApplications() {
+ //limit of allowed resource usage for application masters
+ Resource amLimit = getAMResourceLimit();
+ Resource userAMLimit = getUserAMResourceLimit();
+
for (Iterator<FiCaSchedulerApp> i=pendingApplications.iterator();
i.hasNext(); ) {
FiCaSchedulerApp application = i.next();
- // Check queue limit
- if (getNumActiveApplications() >= getMaximumActiveApplications()) {
- break;
+ // Check am resource limit
+ Resource amIfStarted =
+ Resources.add(application.getAMResource(), usedAMResources);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("application AMResource " + application.getAMResource() +
+ " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent +
+ " amLimit " + amLimit +
+ " lastClusterResource " + lastClusterResource +
+ " amIfStarted " + amIfStarted);
+ }
+
+ if (!Resources.lessThanOrEqual(
+ resourceCalculator, lastClusterResource, amIfStarted, amLimit)) {
+ if (getNumActiveApplications() < 1) {
+ LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+ " single application in queue, it is likely set too low." +
+ " skipping enforcement to allow at least one application to start");
+ } else {
+ LOG.info("not starting application as amIfStarted exceeds amLimit");
+ continue;
+ }
}
- // Check user limit
+ // Check user am resource limit
+
User user = getUser(application.getUser());
- if (user.getActiveApplications() < getMaximumActiveApplicationsPerUser()) {
- user.activateApplication();
- activeApplications.add(application);
- i.remove();
- LOG.info("Application " + application.getApplicationId() +
- " from user: " + application.getUser() +
- " activated in queue: " + getQueueName());
+
+ Resource userAmIfStarted =
+ Resources.add(application.getAMResource(),
+ user.getConsumedAMResources());
+
+ if (!Resources.lessThanOrEqual(
+ resourceCalculator, lastClusterResource, userAmIfStarted,
+ userAMLimit)) {
+ if (getNumActiveApplications() < 1) {
+ LOG.warn("maximum-am-resource-percent is insufficient to start a" +
+ " single application in queue for user, it is likely set too low." +
+ " skipping enforcement to allow at least one application to start");
+ } else {
+ LOG.info("not starting application as amIfStarted exceeds " +
+ "userAmLimit");
+ continue;
+ }
}
+ user.activateApplication();
+ activeApplications.add(application);
+ Resources.addTo(usedAMResources, application.getAMResource());
+ Resources.addTo(user.getConsumedAMResources(),
+ application.getAMResource());
+ i.remove();
+ LOG.info("Application " + application.getApplicationId() +
+ " from user: " + application.getUser() +
+ " activated in queue: " + getQueueName());
}
}
@@ -678,6 +737,10 @@ public class LeafQueue extends AbstractCSQueue {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
+ } else {
+ Resources.subtractFrom(usedAMResources, application.getAMResource());
+ Resources.subtractFrom(user.getConsumedAMResources(),
+ application.getAMResource());
}
applicationAttemptMap.remove(application.getApplicationAttemptId());
@@ -1015,6 +1078,25 @@ public class LeafQueue extends AbstractCSQueue {
return canAssign;
}
+
+ private Resource updateHeadroomInfo(Resource clusterResource,
+ float absoluteMaxAvailCapacity) {
+
+ Resource queueMaxCap =
+ Resources.multiplyAndNormalizeDown(
+ resourceCalculator,
+ clusterResource,
+ absoluteMaxAvailCapacity,
+ minimumAllocation);
+
+ synchronized (queueHeadroomInfo) {
+ queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
+ queueHeadroomInfo.setClusterResource(clusterResource);
+ }
+
+ return queueMaxCap;
+
+ }
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
@@ -1027,18 +1109,9 @@ public class LeafQueue extends AbstractCSQueue {
Resource userLimit =
computeUserLimit(application, clusterResource, required,
queueUser, requestedLabels);
-
- Resource queueMaxCap = // Queue Max-Capacity
- Resources.multiplyAndNormalizeDown(
- resourceCalculator,
- clusterResource,
- absoluteMaxAvailCapacity,
- minimumAllocation);
-
- synchronized (queueHeadroomInfo) {
- queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
- queueHeadroomInfo.setClusterResource(clusterResource);
- }
+
+ Resource queueMaxCap =
+ updateHeadroomInfo(clusterResource, absoluteMaxAvailCapacity);
Resource headroom =
getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
@@ -1734,25 +1807,25 @@ public class LeafQueue extends AbstractCSQueue {
" used=" + usedResources + " numContainers=" + numContainers +
" user=" + userName + " user-resources=" + user.getTotalConsumedResources());
}
+
+ private void updateAbsoluteCapacityResource(Resource clusterResource) {
+
+ absoluteCapacityResource = Resources.multiplyAndNormalizeUp(
+ resourceCalculator,
+ clusterResource,
+ absoluteCapacity, minimumAllocation);
+
+ }
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
lastClusterResource = clusterResource;
+ updateAbsoluteCapacityResource(clusterResource);
- // Update queue properties
- maxActiveApplications =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- clusterResource, minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteMaxCapacity);
- maxActiveAppsUsingAbsCap =
- CSQueueUtils.computeMaxActiveApplications(
- resourceCalculator,
- clusterResource, minimumAllocation,
- maxAMResourcePerQueuePercent, absoluteCapacity);
- maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(
- maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
+ // Update headroom info based on new cluster resource value
+ // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
+ // during allocation
+ updateHeadroomInfo(clusterResource, absoluteMaxCapacity);
// Update metrics
CSQueueUtils.updateQueueStatistics(
@@ -1775,6 +1848,7 @@ public class LeafQueue extends AbstractCSQueue {
@VisibleForTesting
public static class User {
Resource consumed = Resources.createResource(0, 0);
+ Resource consumedAMResources = Resources.createResource(0, 0);
Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
int pendingApplications = 0;
int activeApplications = 0;
@@ -1798,6 +1872,10 @@ public class LeafQueue extends AbstractCSQueue {
public int getActiveApplications() {
return activeApplications;
}
+
+ public Resource getConsumedAMResources() {
+ return consumedAMResources;
+ }
public int getTotalApplications() {
return getPendingApplications() + getActiveApplications();
@@ -1943,6 +2021,10 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public float getAbsActualCapacity() {
+ //? Is this actually used by anything at present?
+ // There is a findbugs warning -re lastClusterResource (now excluded),
+ // when this is used, verify that the access is mt correct and remove
+ // the findbugs exclusion if possible
if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
lastClusterResource, Resources.none())) {
return absoluteCapacity;
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 2f9569c..9f97b13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -72,6 +73,20 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
String user, Queue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+
+ RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
+
+ Resource amResource;
+ if (rmApp == null || rmApp.getAMResourceRequest() == null) {
+ //the rmApp may be undefined (the resource manager checks for this too)
+ //and unmanaged applications do not provide an amResource request
+ //in these cases, provide a default using the scheduler
+ amResource = rmContext.getScheduler().getMinimumResourceCapability();
+ } else {
+ amResource = rmApp.getAMResourceRequest().getCapability();
+ }
+
+ setAMResource(amResource);
}
synchronized public boolean containerCompleted(RMContainer rmContainer,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index ffead48..0be361a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -115,8 +115,8 @@ class CapacitySchedulerPage extends RmView {
_("Num Containers:", Integer.toString(lqinfo.getNumContainers())).
_("Max Applications:", Integer.toString(lqinfo.getMaxApplications())).
_("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())).
- _("Max Schedulable Applications:", Integer.toString(lqinfo.getMaxActiveApplications())).
- _("Max Schedulable Applications Per User:", Integer.toString(lqinfo.getMaxActiveApplicationsPerUser())).
+ _("Max Application Master Resources:", lqinfo.getAMResourceLimit().toString()).
+ _("Max Application Master Resources Per User:", lqinfo.getUserAMResourceLimit().toString()).
_("Configured Capacity:", percent(lqinfo.getCapacity() / 100)).
_("Configured Max Capacity:", percent(lqinfo.getMaxCapacity() / 100)).
_("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%").
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index d90e963..bb4c749 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -32,11 +32,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected int numContainers;
protected int maxApplications;
protected int maxApplicationsPerUser;
- protected int maxActiveApplications;
- protected int maxActiveApplicationsPerUser;
protected int userLimit;
protected UsersInfo users; // To add another level in the XML
protected float userLimitFactor;
+ protected ResourceInfo aMResourceLimit;
+ protected ResourceInfo userAMResourceLimit;
CapacitySchedulerLeafQueueInfo() {
};
@@ -48,11 +48,11 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
numContainers = q.getNumContainers();
maxApplications = q.getMaxApplications();
maxApplicationsPerUser = q.getMaxApplicationsPerUser();
- maxActiveApplications = q.getMaximumActiveApplications();
- maxActiveApplicationsPerUser = q.getMaximumActiveApplicationsPerUser();
userLimit = q.getUserLimit();
users = new UsersInfo(q.getUsers());
userLimitFactor = q.getUserLimitFactor();
+ aMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
+ userAMResourceLimit = new ResourceInfo(q.getUserAMResourceLimit());
}
public int getNumActiveApplications() {
@@ -75,14 +75,6 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
return maxApplicationsPerUser;
}
- public int getMaxActiveApplications() {
- return maxActiveApplications;
- }
-
- public int getMaxActiveApplicationsPerUser() {
- return maxActiveApplicationsPerUser;
- }
-
public int getUserLimit() {
return userLimit;
}
@@ -95,4 +87,12 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
public float getUserLimitFactor() {
return userLimitFactor;
}
+
+ public ResourceInfo getAMResourceLimit() {
+ return aMResourceLimit;
+ }
+
+ public ResourceInfo getUserAMResourceLimit() {
+ return userAMResourceLimit;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
index 62e3e5c..f8d92aa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/MockAsm.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -51,6 +52,7 @@ import com.google.common.collect.Lists;
public abstract class MockAsm extends MockApps {
public static class ApplicationBase implements RMApp {
+ ResourceRequest amReq;
@Override
public String getUser() {
throw new UnsupportedOperationException("Not supported yet.");
@@ -183,6 +185,11 @@ public abstract class MockAsm extends MockApps {
public ReservationId getReservationId() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
}
public static RMApp newApplication(int i) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
index e93d351..f4cb3b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/applicationsmanager/TestAMRMRPCNodeUpdates.java
@@ -23,6 +23,7 @@ import java.util.List;
import org.junit.Assert;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateRequest;
@@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
@@ -55,6 +57,13 @@ public class TestAMRMRPCNodeUpdates {
dispatcher = new DrainDispatcher();
this.rm = new MockRM() {
@Override
+ public void init(Configuration conf) {
+ conf.set(
+ CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
+ "1.0");
+ super.init(conf);
+ }
+ @Override
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
return new SchedulerEventDispatcher(this.scheduler) {
@Override
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
index 4eedd42..f2840b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestCapacitySchedulerPlanFollower.java
@@ -95,6 +95,7 @@ public class TestCapacitySchedulerPlanFollower {
.thenReturn(null);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId) Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
+ when(spyRMContext.getScheduler()).thenReturn(scheduler);
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
index 787b5d7..ec990f9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/MockRMApp.java
@@ -32,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ReservationId;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.YarnApplicationState;
import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationSubmissionContextPBImpl;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
@@ -55,6 +56,7 @@ public class MockRMApp implements RMApp {
StringBuilder diagnostics = new StringBuilder();
RMAppAttempt attempt;
int maxAppAttempts = 1;
+ ResourceRequest amReq;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@@ -264,4 +266,9 @@ public class MockRMApp implements RMApp {
public ReservationId getReservationId() {
throw new UnsupportedOperationException("Not supported yet.");
}
+
+ @Override
+ public ResourceRequest getAMResourceRequest() {
+ return this.amReq;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 0cd74d0..81a5aad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -28,16 +28,21 @@ import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import org.mockito.Matchers;
+import org.mockito.Mockito;
import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -47,8 +52,10 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -56,6 +63,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
+import org.junit.Ignore;
public class TestApplicationLimits {
@@ -119,8 +127,6 @@ public class TestApplicationLimits {
// Some default values
doReturn(100).when(queue).getMaxApplications();
doReturn(25).when(queue).getMaxApplicationsPerUser();
- doReturn(10).when(queue).getMaximumActiveApplications();
- doReturn(2).when(queue).getMaximumActiveApplicationsPerUser();
}
private static final String A = "a";
@@ -136,10 +142,14 @@ public class TestApplicationLimits {
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
conf.setCapacity(Q_B, 90);
+ conf.setUserLimit(CapacitySchedulerConfiguration.ROOT + "." + A, 50);
+ conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + "." + A, 5.0f);
+
LOG.info("Setup top-level queues a and b");
}
- private FiCaSchedulerApp getMockApplication(int appId, String user) {
+ private FiCaSchedulerApp getMockApplication(int appId, String user,
+ Resource amResource) {
FiCaSchedulerApp application = mock(FiCaSchedulerApp.class);
ApplicationAttemptId applicationAttemptId =
TestUtils.getMockApplicationAttemptId(appId, 0);
@@ -147,10 +157,90 @@ public class TestApplicationLimits {
when(application).getApplicationId();
doReturn(applicationAttemptId). when(application).getApplicationAttemptId();
doReturn(user).when(application).getUser();
+ doReturn(amResource).when(application).getAMResource();
return application;
}
@Test
+ public void testAMResourceLimit() throws Exception {
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ // This uses the default 10% of cluster value for the max am resources
+ // which are allowed, at 80GB = 8GB for AM's at the queue level. The user
+ // am limit is 4G initially (based on the queue absolute capacity)
+ // when there is only 1 user, and drops to 2G (the userlimit) when there
+ // is a second user
+ queue.updateClusterResource(Resource.newInstance(80 * GB, 40));
+
+ ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
+ when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+
+ assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+ assertEquals(Resource.newInstance(4 * GB, 1),
+ queue.getUserAMResourceLimit());
+
+ // Two apps for user_0, both start
+ int APPLICATION_ID = 0;
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_0, user_0);
+ assertEquals(1, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ when(activeUsersManager.getNumActiveUsers()).thenReturn(1);
+
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_1, user_0);
+ assertEquals(2, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(2, queue.getNumActiveApplications(user_0));
+ assertEquals(0, queue.getNumPendingApplications(user_0));
+
+ // AMLimits unchanged
+ assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+ assertEquals(Resource.newInstance(4 * GB, 1),
+ queue.getUserAMResourceLimit());
+
+ // One app for user_1, starts
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_1,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_2, user_1);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(0, queue.getNumPendingApplications(user_1));
+
+ when(activeUsersManager.getNumActiveUsers()).thenReturn(2);
+
+ // Now userAMResourceLimit drops to the queue configured 50% as there is
+ // another user active
+ assertEquals(Resource.newInstance(8 * GB, 1), queue.getAMResourceLimit());
+ assertEquals(Resource.newInstance(2 * GB, 1),
+ queue.getUserAMResourceLimit());
+
+ // Second user_1 app cannot start
+ FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_1,
+ Resource.newInstance(2 * GB, 1));
+ queue.submitApplicationAttempt(app_3, user_1);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(1, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(1, queue.getNumPendingApplications(user_1));
+
+ // Now finish app so another should be activated
+ queue.finishApplicationAttempt(app_2, A);
+ assertEquals(3, queue.getNumActiveApplications());
+ assertEquals(0, queue.getNumPendingApplications());
+ assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(0, queue.getNumPendingApplications(user_1));
+
+ }
+
+ @Test
public void testLimitsComputation() throws Exception {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
@@ -172,7 +262,8 @@ public class TestApplicationLimits {
when(csContext.getRMContext()).thenReturn(rmContext);
// Say cluster has 100 nodes of 16G each
- Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 16);
+ Resource clusterResource =
+ Resources.createResource(100 * 16 * GB, 100 * 16);
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
@@ -183,28 +274,14 @@ public class TestApplicationLimits {
LeafQueue queue = (LeafQueue)queues.get(A);
LOG.info("Queue 'A' -" +
- " maxActiveApplications=" + queue.getMaximumActiveApplications() +
- " maxActiveApplicationsPerUser=" +
- queue.getMaximumActiveApplicationsPerUser());
- int expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
- int expectedMaxActiveAppsUsingAbsCap =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.getMaximumApplicationMasterResourcePercent() *
- queue.getAbsoluteCapacity()));
- assertEquals(
- (int)Math.ceil(
- expectedMaxActiveAppsUsingAbsCap * (queue.getUserLimit() / 100.0f) *
- queue.getUserLimitFactor()),
- queue.getMaximumActiveApplicationsPerUser());
+ " aMResourceLimit=" + queue.getAMResourceLimit() +
+ " UserAMResourceLimit=" +
+ queue.getUserAMResourceLimit());
+
+ assertEquals(queue.getAMResourceLimit(), Resource.newInstance(160*GB, 1));
+ assertEquals(queue.getUserAMResourceLimit(),
+ Resource.newInstance(80*GB, 1));
+
assertEquals(
(int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
queue.getMetrics().getAvailableMB()
@@ -213,24 +290,11 @@ public class TestApplicationLimits {
// Add some nodes to the cluster & test new limits
clusterResource = Resources.createResource(120 * 16 * GB);
root.updateClusterResource(clusterResource);
- expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
- expectedMaxActiveAppsUsingAbsCap =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.getMaximumApplicationMasterResourcePercent() *
- queue.getAbsoluteCapacity()));
- assertEquals(
- (int)Math.ceil(expectedMaxActiveAppsUsingAbsCap *
- (queue.getUserLimit() / 100.0f) * queue.getUserLimitFactor()),
- queue.getMaximumActiveApplicationsPerUser());
+
+ assertEquals(queue.getAMResourceLimit(), Resource.newInstance(192*GB, 1));
+ assertEquals(queue.getUserAMResourceLimit(),
+ Resource.newInstance(96*GB, 1));
+
assertEquals(
(int)(clusterResource.getMemory() * queue.getAbsoluteCapacity()),
queue.getMetrics().getAvailableMB()
@@ -271,18 +335,15 @@ public class TestApplicationLimits {
clusterResource = Resources.createResource(100 * 16 * GB);
queue = (LeafQueue)queues.get(A);
- expectedMaxActiveApps =
- Math.max(1,
- (int)Math.ceil(((float)clusterResource.getMemory() / (1*GB)) *
- csConf.
- getMaximumApplicationMasterResourcePerQueuePercent(
- queue.getQueuePath()) *
- queue.getAbsoluteMaximumCapacity()));
assertEquals((long) 0.5,
- (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(queue.getQueuePath()));
- assertEquals(expectedMaxActiveApps,
- queue.getMaximumActiveApplications());
+ (long) csConf.getMaximumApplicationMasterResourcePerQueuePercent(
+ queue.getQueuePath())
+ );
+
+ assertEquals(queue.getAMResourceLimit(), Resource.newInstance(800*GB, 1));
+ assertEquals(queue.getUserAMResourceLimit(),
+ Resource.newInstance(400*GB, 1));
// Change the per-queue max applications.
csConf.setInt(
@@ -308,10 +369,16 @@ public class TestApplicationLimits {
public void testActiveApplicationLimits() throws Exception {
final String user_0 = "user_0";
final String user_1 = "user_1";
+ final String user_2 = "user_2";
+
+ assertEquals(Resource.newInstance(16 * GB, 1), queue.getAMResourceLimit());
+ assertEquals(Resource.newInstance(8 * GB, 1),
+ queue.getUserAMResourceLimit());
int APPLICATION_ID = 0;
// Submit first application
- FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_0, user_0);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -319,15 +386,17 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit second application
- FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_1, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(0, queue.getNumPendingApplications(user_0));
- // Submit third application, should remain pending
- FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+ // Submit third application, should remain pending due to user amlimit
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_2, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -342,18 +411,17 @@ public class TestApplicationLimits {
assertEquals(0, queue.getNumPendingApplications(user_0));
// Submit another one for user_0
- FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_3, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
- // Change queue limit to be smaller so 2 users can fill it up
- doReturn(3).when(queue).getMaximumActiveApplications();
-
// Submit first app for user_1
- FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1);
+ FiCaSchedulerApp app_4 = getMockApplication(APPLICATION_ID++, user_1,
+ Resources.createResource(8 * GB, 0));
queue.submitApplicationAttempt(app_4, user_1);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -362,15 +430,17 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumActiveApplications(user_1));
assertEquals(0, queue.getNumPendingApplications(user_1));
- // Submit second app for user_1, should block due to queue-limit
- FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_1);
- queue.submitApplicationAttempt(app_5, user_1);
+ // Submit first app for user_2, should block due to queue amlimit
+ FiCaSchedulerApp app_5 = getMockApplication(APPLICATION_ID++, user_2,
+ Resources.createResource(8 * GB, 0));
+ queue.submitApplicationAttempt(app_5, user_2);
assertEquals(3, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
assertEquals(1, queue.getNumActiveApplications(user_1));
- assertEquals(1, queue.getNumPendingApplications(user_1));
+ assertEquals(0, queue.getNumPendingApplications(user_1));
+ assertEquals(1, queue.getNumPendingApplications(user_2));
// Now finish one app of user_1 so app_5 should be activated
queue.finishApplicationAttempt(app_4, A);
@@ -378,21 +448,22 @@ public class TestApplicationLimits {
assertEquals(1, queue.getNumPendingApplications());
assertEquals(2, queue.getNumActiveApplications(user_0));
assertEquals(1, queue.getNumPendingApplications(user_0));
- assertEquals(1, queue.getNumActiveApplications(user_1));
+ assertEquals(0, queue.getNumActiveApplications(user_1));
assertEquals(0, queue.getNumPendingApplications(user_1));
+ assertEquals(1, queue.getNumActiveApplications(user_2));
+ assertEquals(0, queue.getNumPendingApplications(user_2));
+
}
-
+
@Test
public void testActiveLimitsWithKilledApps() throws Exception {
final String user_0 = "user_0";
int APPLICATION_ID = 0;
- // set max active to 2
- doReturn(2).when(queue).getMaximumActiveApplications();
-
// Submit first application
- FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_0 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_0, user_0);
assertEquals(1, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -401,7 +472,8 @@ public class TestApplicationLimits {
assertTrue(queue.activeApplications.contains(app_0));
// Submit second application
- FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_1 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_1, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(0, queue.getNumPendingApplications());
@@ -410,7 +482,8 @@ public class TestApplicationLimits {
assertTrue(queue.activeApplications.contains(app_1));
// Submit third application, should remain pending
- FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_2 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_2, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(1, queue.getNumPendingApplications());
@@ -419,7 +492,8 @@ public class TestApplicationLimits {
assertTrue(queue.pendingApplications.contains(app_2));
// Submit fourth application, should remain pending
- FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0);
+ FiCaSchedulerApp app_3 = getMockApplication(APPLICATION_ID++, user_0,
+ Resources.createResource(4 * GB, 0));
queue.submitApplicationAttempt(app_3, user_0);
assertEquals(2, queue.getNumActiveApplications());
assertEquals(2, queue.getNumPendingApplications());
@@ -506,6 +580,18 @@ public class TestApplicationLimits {
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
RMContext rmContext = TestUtils.getMockRMContext();
+ RMContext spyRMContext = spy(rmContext);
+
+ ConcurrentMap<ApplicationId, RMApp> spyApps =
+ spy(new ConcurrentHashMap<ApplicationId, RMApp>());
+ RMApp rmApp = mock(RMApp.class);
+ ResourceRequest amResourceRequest = mock(ResourceRequest.class);
+ Resource amResource = Resources.createResource(0, 0);
+ when(amResourceRequest.getCapability()).thenReturn(amResource);
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
+ Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
+ when(spyRMContext.getRMApps()).thenReturn(spyApps);
+
Priority priority_1 = TestUtils.createMockPriority(1);
@@ -513,9 +599,9 @@ public class TestApplicationLimits {
// and check headroom
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
- FiCaSchedulerApp app_0_0 =
- spy(new FiCaSchedulerApp(appAttemptId_0_0, user_0, queue,
- queue.getActiveUsersManager(), rmContext));
+ FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
+ appAttemptId_0_0, user_0, queue,
+ queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -532,9 +618,9 @@ public class TestApplicationLimits {
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
- FiCaSchedulerApp app_0_1 =
- spy(new FiCaSchedulerApp(appAttemptId_0_1, user_0, queue,
- queue.getActiveUsersManager(), rmContext));
+ FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
+ appAttemptId_0_1, user_0, queue,
+ queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -551,9 +637,9 @@ public class TestApplicationLimits {
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
- FiCaSchedulerApp app_1_0 =
- spy(new FiCaSchedulerApp(appAttemptId_1_0, user_1, queue,
- queue.getActiveUsersManager(), rmContext));
+ FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
+ appAttemptId_1_0, user_1, queue,
+ queue.getActiveUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index fb7bb2c..ead5719 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -101,6 +101,7 @@ public class TestLeafQueue {
RMContext rmContext;
RMContext spyRMContext;
+ ResourceRequest amResourceRequest;
CapacityScheduler cs;
CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
@@ -124,6 +125,10 @@ public class TestLeafQueue {
spy(new ConcurrentHashMap<ApplicationId, RMApp>());
RMApp rmApp = mock(RMApp.class);
when(rmApp.getRMAppAttempt((ApplicationAttemptId)Matchers.any())).thenReturn(null);
+ amResourceRequest = mock(ResourceRequest.class);
+ when(amResourceRequest.getCapability()).thenReturn(
+ Resources.createResource(0, 0));
+ when(rmApp.getAMResourceRequest()).thenReturn(amResourceRequest);
Mockito.doReturn(rmApp).when(spyApps).get((ApplicationId)Matchers.any());
when(spyRMContext.getRMApps()).thenReturn(spyApps);
@@ -265,26 +270,37 @@ public class TestLeafQueue {
@Test
public void testInitializeQueue() throws Exception {
- final float epsilon = 1e-5f;
- //can add more sturdy test with 3-layer queues
- //once MAPREDUCE:3410 is resolved
- LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
- assertEquals(0.085, a.getCapacity(), epsilon);
- assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
- assertEquals(0.2, a.getMaximumCapacity(), epsilon);
- assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+ final float epsilon = 1e-5f;
+ //can add more sturdy test with 3-layer queues
+ //once MAPREDUCE:3410 is resolved
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+ assertEquals(0.085, a.getCapacity(), epsilon);
+ assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.2, a.getMaximumCapacity(), epsilon);
+ assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
+
+ LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
+ assertEquals(0.80, b.getCapacity(), epsilon);
+ assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.99, b.getMaximumCapacity(), epsilon);
+ assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
+
+ ParentQueue c = (ParentQueue)queues.get(C);
+ assertEquals(0.015, c.getCapacity(), epsilon);
+ assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
+ assertEquals(0.1, c.getMaximumCapacity(), epsilon);
+ assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+
+ //Verify the value for getAMResourceLimit for queues with < .1 maxcap
+ Resource clusterResource = Resource.newInstance(50 * GB, 50);
- LeafQueue b = stubLeafQueue((LeafQueue)queues.get(B));
- assertEquals(0.80, b.getCapacity(), epsilon);
- assertEquals(0.80, b.getAbsoluteCapacity(), epsilon);
- assertEquals(0.99, b.getMaximumCapacity(), epsilon);
- assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
-
- ParentQueue c = (ParentQueue)queues.get(C);
- assertEquals(0.015, c.getCapacity(), epsilon);
- assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
- assertEquals(0.1, c.getMaximumCapacity(), epsilon);
- assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
+ a.updateClusterResource(clusterResource);
+ assertEquals(Resource.newInstance(1 * GB, 1),
+ a.getAMResourceLimit());
+
+ b.updateClusterResource(clusterResource);
+ assertEquals(Resource.newInstance(5 * GB, 1),
+ b.getAMResourceLimit());
}
@Test
@@ -679,7 +695,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
qb.submitApplicationAttempt(app_0, user_0);
Priority u0Priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
@@ -702,7 +718,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
Priority u1Priority = TestUtils.createMockPriority(2);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
@@ -736,12 +752,12 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
u0Priority, recordFactory)));
@@ -764,7 +780,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
- qb.getActiveUsersManager(), rmContext);
+ qb.getActiveUsersManager(), spyRMContext);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
@@ -980,7 +996,6 @@ public class TestLeafQueue {
assertEquals(0*GB, app_1.getHeadroom().getMemory());
// Check headroom for app_2
- LOG.info("here");
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
@@ -1904,6 +1919,9 @@ public class TestLeafQueue {
// Users
final String user_e = "user_e";
+
+ when(amResourceRequest.getCapability()).thenReturn(
+ Resources.createResource(1 * GB, 0));
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
@@ -1942,7 +1960,7 @@ public class TestLeafQueue {
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
- root.reinitialize(newRoot, cs.getClusterResource());
+ root.reinitialize(newRoot, csContext.getClusterResource());
// after reinitialization
assertEquals(3, e.activeApplications.size());
@@ -1982,6 +2000,9 @@ public class TestLeafQueue {
// Users
final String user_e = "user_e";
+
+ when(amResourceRequest.getCapability()).thenReturn(
+ Resources.createResource(1 * GB, 0));
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
@@ -2291,20 +2312,20 @@ public class TestLeafQueue {
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 80);
LeafQueue a = new LeafQueue(csContext, A, root, null);
assertEquals(0.1f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
- assertEquals(160, a.getMaximumActiveApplications());
+ assertEquals(a.getAMResourceLimit(), Resources.createResource(160 * GB, 1));
csConf.setFloat(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, 0.2f);
LeafQueue newA = new LeafQueue(csContext, A, root, null);
a.reinitialize(newA, clusterResource);
assertEquals(0.2f, a.getMaxAMResourcePerQueuePercent(), 1e-3f);
- assertEquals(320, a.getMaximumActiveApplications());
+ assertEquals(a.getAMResourceLimit(), Resources.createResource(320 * GB, 1));
Resource newClusterResource = Resources.createResource(100 * 20 * GB,
100 * 32);
a.updateClusterResource(newClusterResource);
// 100 * 20 * 0.2 = 400
- assertEquals(400, a.getMaximumActiveApplications());
+ assertEquals(a.getAMResourceLimit(), Resources.createResource(400 * GB, 1));
}
@Test
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f1b35ffd/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index 2a49545..985609e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -77,6 +77,7 @@ public class TestReservations {
.getRecordFactory(null);
RMContext rmContext;
+ RMContext spyRMContext;
CapacityScheduler cs;
// CapacitySchedulerConfiguration csConf;
CapacitySchedulerContext csContext;
@@ -132,7 +133,10 @@ public class TestReservations {
root = CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
- cs.setRMContext(rmContext);
+ spyRMContext = spy(rmContext);
+ when(spyRMContext.getScheduler()).thenReturn(cs);
+
+ cs.setRMContext(spyRMContext);
cs.init(csConf);
cs.start();
}
@@ -212,14 +216,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -361,14 +365,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -506,14 +510,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -618,7 +622,7 @@ public class TestReservations {
.getMockApplicationAttemptId(0, 0);
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
String host_0 = "host_0";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0,
@@ -685,7 +689,7 @@ public class TestReservations {
.getMockApplicationAttemptId(0, 0);
LeafQueue a = stubLeafQueue((LeafQueue) queues.get(A));
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
String host_1 = "host_1";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0,
@@ -742,14 +746,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -916,14 +920,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes
@@ -1042,14 +1046,14 @@ public class TestReservations {
final ApplicationAttemptId appAttemptId_0 = TestUtils
.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 = new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 = TestUtils
.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 = new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), rmContext);
+ mock(ActiveUsersManager.class), spyRMContext);
a.submitApplicationAttempt(app_1, user_0);
// Setup some nodes