You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2014/10/07 00:51:46 UTC
git commit: YARN-2644. Fixed CapacityScheduler to return up-to-date
headroom when AM allocates. Contributed by Craig Welch (cherry picked from
commit 519e5a7dd2bd540105434ec3c8939b68f6c024f8)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 eeb39dc21 -> 5c33e9122
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when AM allocates. Contributed by Craig Welch
(cherry picked from commit 519e5a7dd2bd540105434ec3c8939b68f6c024f8)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/5c33e912
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/5c33e912
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/5c33e912
Branch: refs/heads/branch-2
Commit: 5c33e9122900ee2b5d90eff39ac924a7217e74e3
Parents: eeb39dc
Author: Jian He <ji...@apache.org>
Authored: Mon Oct 6 15:47:48 2014 -0700
Committer: Jian He <ji...@apache.org>
Committed: Mon Oct 6 15:51:38 2014 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../capacity/CapacityHeadroomProvider.java | 65 +++++++++++++
.../scheduler/capacity/CapacityScheduler.java | 4 +
.../scheduler/capacity/LeafQueue.java | 74 ++++++++++++---
.../scheduler/common/fica/FiCaSchedulerApp.java | 28 ++++++
.../capacity/TestApplicationLimits.java | 18 ++--
.../scheduler/capacity/TestLeafQueue.java | 98 +++++++++++++++++++-
7 files changed, 263 insertions(+), 27 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8f97018..19a56d9 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -552,6 +552,9 @@ Release 2.6.0 - UNRELEASED
YARN-2611. Fixing jenkins findbugs warning and TestRMWebServicesCapacitySched
for branch YARN-1051. (Subru Krishnan and Carlo Curino via subru)
+ YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
+ AM allocates. (Craig Welch via jianhe)
+
Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
new file mode 100644
index 0000000..f79d195
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.api.records.Resource;
+
+public class CapacityHeadroomProvider {
+
+ LeafQueue.User user;
+ LeafQueue queue;
+ FiCaSchedulerApp application;
+ Resource required;
+ LeafQueue.QueueHeadroomInfo queueHeadroomInfo;
+
+ public CapacityHeadroomProvider(
+ LeafQueue.User user,
+ LeafQueue queue,
+ FiCaSchedulerApp application,
+ Resource required,
+ LeafQueue.QueueHeadroomInfo queueHeadroomInfo) {
+
+ this.user = user;
+ this.queue = queue;
+ this.application = application;
+ this.required = required;
+ this.queueHeadroomInfo = queueHeadroomInfo;
+
+ }
+
+ public Resource getHeadroom() {
+
+ Resource queueMaxCap;
+ Resource clusterResource;
+ synchronized (queueHeadroomInfo) {
+ queueMaxCap = queueHeadroomInfo.getQueueMaxCap();
+ clusterResource = queueHeadroomInfo.getClusterResource();
+ }
+ Resource headroom = queue.getHeadroom(user, queueMaxCap,
+ clusterResource, application, required);
+
+ // Corner case to deal with applications being slightly over-limit
+ if (headroom.getMemory() < 0) {
+ headroom.setMemory(0);
+ }
+ return headroom;
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 6a3c7dc..02f27b8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -469,6 +469,10 @@ public class CapacityScheduler extends
// Re-configure queues
root.reinitialize(newRoot, clusterResource);
initializeQueueMappings();
+
+ // Re-calculate headroom for active applications
+ root.updateClusterResource(clusterResource);
+
}
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index f0cff71..57f0907 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -132,6 +132,8 @@ public class LeafQueue implements CSQueue {
private boolean reservationsContinueLooking;
+ private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
+
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) {
this.scheduler = cs;
@@ -970,6 +972,22 @@ public class LeafQueue implements CSQueue {
// "re-reservation" is *free*
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
}
+
+ protected Resource getHeadroom(User user, Resource queueMaxCap,
+ Resource clusterResource, FiCaSchedulerApp application, Resource required) {
+ return getHeadroom(user, queueMaxCap, clusterResource,
+ computeUserLimit(application, clusterResource, required, user));
+ }
+
+ private Resource getHeadroom(User user, Resource queueMaxCap,
+ Resource clusterResource, Resource userLimit) {
+ Resource headroom =
+ Resources.subtract(
+ Resources.min(resourceCalculator, clusterResource,
+ userLimit, queueMaxCap),
+ user.getConsumedResources());
+ return headroom;
+ }
@Private
@@ -1038,12 +1056,14 @@ public class LeafQueue implements CSQueue {
String user = application.getUser();
+ User queueUser = getUser(user);
+
/**
* Headroom is min((userLimit, queue-max-cap) - consumed)
*/
Resource userLimit = // User limit
- computeUserLimit(application, clusterResource, required);
+ computeUserLimit(application, clusterResource, required, queueUser);
//Max avail capacity needs to take into account usage by ancestor-siblings
//which are greater than their base capacity, so we are interested in "max avail"
@@ -1057,23 +1077,27 @@ public class LeafQueue implements CSQueue {
clusterResource,
absoluteMaxAvailCapacity,
minimumAllocation);
+
+ synchronized (queueHeadroomInfo) {
+ queueHeadroomInfo.setQueueMaxCap(queueMaxCap);
+ queueHeadroomInfo.setClusterResource(clusterResource);
+ }
- Resource userConsumed = getUser(user).getConsumedResources();
- Resource headroom =
- Resources.subtract(
- Resources.min(resourceCalculator, clusterResource,
- userLimit, queueMaxCap),
- userConsumed);
+ Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxCap=" + queueMaxCap +
- " consumed=" + userConsumed +
+ " consumed=" + queueUser.getConsumedResources() +
" headroom=" + headroom);
}
- application.setHeadroom(headroom);
+ CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
+ queueUser, this, application, required, queueHeadroomInfo);
+
+ application.setHeadroomProvider(headroomProvider);
+
metrics.setAvailableResourcesToUser(user, headroom);
return userLimit;
@@ -1081,7 +1105,7 @@ public class LeafQueue implements CSQueue {
@Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application,
- Resource clusterResource, Resource required) {
+ Resource clusterResource, Resource required, User user) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
@@ -1138,7 +1162,7 @@ public class LeafQueue implements CSQueue {
" userLimit=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
- " consumed: " + getUser(userName).getConsumedResources() +
+ " consumed: " + user.getConsumedResources() +
" limit: " + limit +
" queueCapacity: " + queueCapacity +
" qconsumed: " + usedResources +
@@ -1687,9 +1711,6 @@ public class LeafQueue implements CSQueue {
String userName = application.getUser();
User user = getUser(userName);
user.assignContainer(resource);
- // Note this is a bit unconventional since it gets the object and modifies it here
- // rather then using set routine
- Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
@@ -1896,4 +1917,29 @@ public class LeafQueue implements CSQueue {
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
+
+ /*
+ * Holds shared values used by all applications in
+ * the queue to calculate headroom on demand
+ */
+ static class QueueHeadroomInfo {
+ private Resource queueMaxCap;
+ private Resource clusterResource;
+
+ public void setQueueMaxCap(Resource queueMaxCap) {
+ this.queueMaxCap = queueMaxCap;
+ }
+
+ public Resource getQueueMaxCap() {
+ return queueMaxCap;
+ }
+
+ public void setClusterResource(Resource clusterResource) {
+ this.clusterResource = clusterResource;
+ }
+
+ public Resource getClusterResource() {
+ return clusterResource;
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index dc0d0f0..2f9569c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -51,6 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
/**
* Represents an application attempt from the viewpoint of the FIFO or Capacity
@@ -64,6 +65,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
private final Set<ContainerId> containersToPreempt =
new HashSet<ContainerId>();
+
+ private CapacityHeadroomProvider headroomProvider;
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
String user, Queue queue, ActiveUsersManager activeUsersManager,
@@ -280,6 +283,31 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
return null;
}
+
+ public synchronized void setHeadroomProvider(
+ CapacityHeadroomProvider headroomProvider) {
+ this.headroomProvider = headroomProvider;
+ }
+
+ public synchronized CapacityHeadroomProvider getHeadroomProvider() {
+ return headroomProvider;
+ }
+
+ @Override
+ public synchronized Resource getHeadroom() {
+ if (headroomProvider != null) {
+ return headroomProvider.getHeadroom();
+ }
+ return super.getHeadroom();
+ }
+
+ @Override
+ public synchronized void transferStateFromPreviousAttempt(
+ SchedulerApplicationAttempt appAttempt) {
+ super.transferStateFromPreviousAttempt(appAttempt);
+ this.headroomProvider =
+ ((FiCaSchedulerApp) appAttempt).getHeadroomProvider();
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index ff8e873..b922c02 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -518,7 +518,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false);
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
- verify(app_0_0).setHeadroom(eq(expectedHeadroom));
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
@@ -536,8 +536,8 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
- verify(app_0_0, times(2)).setHeadroom(eq(expectedHeadroom));
- verify(app_0_1).setHeadroom(eq(expectedHeadroom));// no change
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+ assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
@@ -556,17 +556,17 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
- verify(app_0_0).setHeadroom(eq(expectedHeadroom));
- verify(app_0_1).setHeadroom(eq(expectedHeadroom));
- verify(app_1_0).setHeadroom(eq(expectedHeadroom));
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+ assertEquals(expectedHeadroom, app_0_1.getHeadroom());
+ assertEquals(expectedHeadroom, app_1_0.getHeadroom());
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
queue.assignContainers(clusterResource, node_0, false); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
- verify(app_0_0).setHeadroom(eq(expectedHeadroom));
- verify(app_0_1).setHeadroom(eq(expectedHeadroom));
- verify(app_1_0).setHeadroom(eq(expectedHeadroom));
+ assertEquals(expectedHeadroom, app_0_0.getHeadroom());
+ assertEquals(expectedHeadroom, app_0_1.getHeadroom());
+ assertEquals(expectedHeadroom, app_1_0.getHeadroom());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/5c33e912/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 092ff83..9e06c52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -640,6 +640,94 @@ public class TestLeafQueue {
}
@Test
+ public void testUserHeadroomMultiApp() throws Exception {
+ // Mock the queue
+ LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+ //unset maxCapacity
+ a.setMaxCapacity(1.0f);
+
+ // Users
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+
+ // Submit applications
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+ a.getActiveUsersManager(), spyRMContext);
+ a.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_0, a,
+ a.getActiveUsersManager(), spyRMContext);
+ a.submitApplicationAttempt(app_1, user_0); // same user
+
+ final ApplicationAttemptId appAttemptId_2 =
+ TestUtils.getMockApplicationAttemptId(2, 0);
+ FiCaSchedulerApp app_2 =
+ new FiCaSchedulerApp(appAttemptId_2, user_1, a,
+ a.getActiveUsersManager(), spyRMContext);
+ a.submitApplicationAttempt(app_2, user_1);
+
+ // Setup some nodes
+ String host_0 = "127.0.0.1";
+ FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK,
+ 0, 16*GB);
+ String host_1 = "127.0.0.2";
+ FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK,
+ 0, 16*GB);
+
+ final int numNodes = 2;
+ Resource clusterResource = Resources.createResource(numNodes * (16*GB), 1);
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+ Priority priority = TestUtils.createMockPriority(1);
+
+ app_0.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
+ priority, recordFactory)));
+
+ a.assignContainers(clusterResource, node_0, false);
+ assertEquals(1*GB, a.getUsedResources().getMemory());
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
+ //Now, headroom is the same for all apps for a given user + queue combo
+ //and a change to any app's headroom is reflected for all the user's apps
+ //once those apps are active/have themselves calculated headroom for
+ //allocation at least one time
+ assertEquals(2*GB, app_0.getHeadroom().getMemory());
+ assertEquals(0*GB, app_1.getHeadroom().getMemory());//not yet active
+ assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
+
+ app_1.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+ priority, recordFactory)));
+
+ a.assignContainers(clusterResource, node_0, false);
+ assertEquals(2*GB, a.getUsedResources().getMemory());
+ assertEquals(1*GB, app_0.getCurrentConsumption().getMemory());
+ assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
+ assertEquals(1*GB, app_0.getHeadroom().getMemory());
+ assertEquals(1*GB, app_1.getHeadroom().getMemory());//now active
+ assertEquals(0*GB, app_2.getHeadroom().getMemory());//not yet active
+
+ //Complete container and verify that headroom is updated, for both apps
+ //for the user
+ RMContainer rmContainer = app_0.getLiveContainers().iterator().next();
+ a.completedContainer(clusterResource, app_0, node_0, rmContainer,
+ ContainerStatus.newInstance(rmContainer.getContainerId(),
+ ContainerState.COMPLETE, "",
+ ContainerExitStatus.KILLED_BY_RESOURCEMANAGER),
+ RMContainerEventType.KILL, null, true);
+
+ assertEquals(2*GB, app_0.getHeadroom().getMemory());
+ assertEquals(2*GB, app_1.getHeadroom().getMemory());
+ }
+
+ @Test
public void testHeadroomWithMaxCap() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
@@ -710,16 +798,18 @@ public class TestLeafQueue {
assertEquals(2*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemory());
- assertEquals(0*GB, app_0.getHeadroom().getMemory()); // User limit = 2G
- assertEquals(0*GB, app_1.getHeadroom().getMemory()); // User limit = 2G
+ assertEquals(2*GB, app_0.getHeadroom().getMemory());
+ // User limit = 4G, 2 in use
+ assertEquals(0*GB, app_1.getHeadroom().getMemory());
+ // the application is not yet active
// Again one to user_0 since he hasn't exceeded user limit yet
a.assignContainers(clusterResource, node_0, false);
assertEquals(3*GB, a.getUsedResources().getMemory());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemory());
- assertEquals(0*GB, app_0.getHeadroom().getMemory()); // 3G - 2G
- assertEquals(0*GB, app_1.getHeadroom().getMemory()); // 3G - 2G
+ assertEquals(1*GB, app_0.getHeadroom().getMemory()); // 4G - 3G
+ assertEquals(1*GB, app_1.getHeadroom().getMemory()); // 4G - 3G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);