You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by tg...@apache.org on 2015/04/23 16:57:31 UTC
hadoop git commit: YARN-3434. Interaction between reservations and
userlimit can result in significant ULF violation (cherry picked from commit
189a63a719c63b67a1783a280bfc2f72dcb55277)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 889b92fa4 -> 1cd2fcf25
YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation
(cherry picked from commit 189a63a719c63b67a1783a280bfc2f72dcb55277)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/1cd2fcf2
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/1cd2fcf2
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/1cd2fcf2
Branch: refs/heads/branch-2
Commit: 1cd2fcf25dc614c0567e6da776fef737640e4293
Parents: 889b92f
Author: tgraves <tg...@apache.org>
Authored: Thu Apr 23 14:39:25 2015 +0000
Committer: tgraves <tg...@apache.org>
Committed: Thu Apr 23 14:49:24 2015 +0000
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/ResourceLimits.java | 28 +++-
.../scheduler/capacity/AbstractCSQueue.java | 94 +++++------
.../scheduler/capacity/LeafQueue.java | 162 ++++++++-----------
.../scheduler/capacity/TestReservations.java | 65 +++++---
5 files changed, 186 insertions(+), 166 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cd2fcf2/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 8b09926..261e052 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -204,6 +204,9 @@ Release 2.8.0 - UNRELEASED
YARN-3495. Confusing log generated by FairScheduler.
(Brahma Reddy Battula via ozawa)
+ YARN-3434. Interaction between reservations and userlimit can result in
+ significant ULF violation (tgraves)
+
Release 2.7.1 - UNRELEASED
INCOMPATIBLE CHANGES
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cd2fcf2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 12333e8..8074794 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -19,22 +19,44 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Resource limits for queues/applications, this means max overall (please note
* that, it's not "extra") resource you can get.
*/
public class ResourceLimits {
+ volatile Resource limit;
+
+ // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
+ // config. This limit indicates how much we need to unreserve to allocate
+ // another container.
+ private volatile Resource amountNeededUnreserve;
+
public ResourceLimits(Resource limit) {
+ this.amountNeededUnreserve = Resources.none();
this.limit = limit;
}
-
- volatile Resource limit;
+
+ public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
+ this.amountNeededUnreserve = amountNeededUnreserve;
+ this.limit = limit;
+ }
+
public Resource getLimit() {
return limit;
}
-
+
+ public Resource getAmountNeededUnreserve() {
+ return amountNeededUnreserve;
+ }
+
public void setLimit(Resource limit) {
this.limit = limit;
}
+
+ public void setAmountNeededUnreserve(Resource amountNeededUnreserve) {
+ this.amountNeededUnreserve = amountNeededUnreserve;
+ }
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cd2fcf2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 9233e01..47cea19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -85,7 +85,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
// etc.
QueueCapacities queueCapacities;
-
+
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
protected CapacitySchedulerContext csContext;
@@ -473,55 +473,55 @@ public abstract class AbstractCSQueue implements CSQueue {
getCurrentLimitResource(nodePartition, clusterResource,
currentResourceLimits, schedulingMode);
- // if reservation continous looking enabled, check to see if could we
- // potentially use this node instead of a reserved node if the application
- // has reserved containers.
- // TODO, now only consider reservation cases when the node has no label
- if (this.reservationsContinueLooking
- && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
- && Resources.greaterThan(resourceCalculator, clusterResource,
- resourceCouldBeUnreserved, Resources.none())) {
- // resource-without-reserved = used - reserved
- Resource newTotalWithoutReservedResource =
- Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
-
- // when total-used-without-reserved-resource < currentLimit, we still
- // have chance to allocate on this node by unreserving some containers
- if (Resources.lessThan(resourceCalculator, clusterResource,
- newTotalWithoutReservedResource, currentLimitResource)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to use reserved: " + getQueueName()
- + " usedResources: " + queueUsage.getUsed()
- + ", clusterResources: " + clusterResource
- + ", reservedResources: " + resourceCouldBeUnreserved
- + ", capacity-without-reserved: "
- + newTotalWithoutReservedResource + ", maxLimitCapacity: "
- + currentLimitResource);
- }
- return true;
- }
- }
-
- // Check if we over current-resource-limit computed.
if (Resources.greaterThan(resourceCalculator, clusterResource,
newTotalResource, currentLimitResource)) {
- return false;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName()
- + "Check assign to queue, nodePartition="
- + nodePartition
- + " usedResources: "
- + queueUsage.getUsed(nodePartition)
- + " clusterResources: "
- + clusterResource
- + " currentUsedCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(nodePartition),
- labelManager.getResourceByLabel(nodePartition, clusterResource))
- + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+ // if reservation continous looking enabled, check to see if could we
+ // potentially use this node instead of a reserved node if the application
+ // has reserved containers.
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking
+ && nodePartition.equals(RMNodeLabelsManager.NO_LABEL)
+ && Resources.greaterThan(resourceCalculator, clusterResource,
+ resourceCouldBeUnreserved, Resources.none())) {
+ // resource-without-reserved = used - reserved
+ Resource newTotalWithoutReservedResource =
+ Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+ // when total-used-without-reserved-resource < currentLimit, we still
+ // have chance to allocate on this node by unreserving some containers
+ if (Resources.lessThan(resourceCalculator, clusterResource,
+ newTotalWithoutReservedResource, currentLimitResource)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to use reserved: " + getQueueName()
+ + " usedResources: " + queueUsage.getUsed()
+ + ", clusterResources: " + clusterResource
+ + ", reservedResources: " + resourceCouldBeUnreserved
+ + ", capacity-without-reserved: "
+ + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ + currentLimitResource);
+ }
+ currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
+ currentLimitResource));
+ return true;
+ }
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName()
+ + "Check assign to queue, nodePartition="
+ + nodePartition
+ + " usedResources: "
+ + queueUsage.getUsed(nodePartition)
+ + " clusterResources: "
+ + clusterResource
+ + " currentUsedCapacity "
+ + Resources.divide(resourceCalculator, clusterResource,
+ queueUsage.getUsed(nodePartition),
+ labelManager.getResourceByLabel(nodePartition, clusterResource))
+ + " max-capacity: "
+ + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")");
+ }
+ return false;
}
return true;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cd2fcf2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 69e7e53..22aafaa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -119,8 +119,8 @@ public class LeafQueue extends AbstractCSQueue {
private final QueueResourceLimitsInfo queueResourceLimitsInfo =
new QueueResourceLimitsInfo();
- private volatile ResourceLimits currentResourceLimits = null;
-
+ private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
+
private OrderingPolicy<FiCaSchedulerApp>
orderingPolicy = new FifoOrderingPolicy<FiCaSchedulerApp>();
@@ -151,7 +151,7 @@ public class LeafQueue extends AbstractCSQueue {
this.lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
- this.currentResourceLimits = new ResourceLimits(clusterResource);
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
// Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization
@@ -715,14 +715,14 @@ public class LeafQueue extends AbstractCSQueue {
activateApplications();
LOG.info("Application removed -" +
- " appId: " + application.getApplicationId() +
- " user: " + application.getUser() +
+ " appId: " + application.getApplicationId() +
+ " user: " + application.getUser() +
" queue: " + getQueueName() +
" #user-pending-applications: " + user.getPendingApplications() +
" #user-active-applications: " + user.getActiveApplications() +
" #queue-pending-applications: " + getNumPendingApplications() +
" #queue-active-applications: " + getNumActiveApplications()
- );
+ );
}
private synchronized FiCaSchedulerApp getApplication(
@@ -854,18 +854,18 @@ public class LeafQueue extends AbstractCSQueue {
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
- required, node.getPartition(), schedulingMode);
-
+ required, node.getPartition(), schedulingMode);
+
// Check queue max-capacity limit
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
- this.currentResourceLimits, required,
+ currentResourceLimits, required,
application.getCurrentReservation(), schedulingMode)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
- application, true, node.getPartition())) {
+ application, node.getPartition(), currentResourceLimits)) {
break;
}
@@ -906,9 +906,9 @@ public class LeafQueue extends AbstractCSQueue {
}
// Try to schedule
- CSAssignment assignment =
- assignContainersOnNode(clusterResource, node, application, priority,
- null, schedulingMode);
+ CSAssignment assignment =
+ assignContainersOnNode(clusterResource, node, application, priority,
+ null, schedulingMode, currentResourceLimits);
// Did the application skip this node?
if (assignment.getSkipped()) {
@@ -975,7 +975,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources
CSAssignment tmp =
assignContainersOnNode(clusterResource, node, application, priority,
- rmContainer, schedulingMode);
+ rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@@ -1026,7 +1026,7 @@ public class LeafQueue extends AbstractCSQueue {
private void setQueueResourceLimitsInfo(
Resource clusterResource) {
synchronized (queueResourceLimitsInfo) {
- queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+ queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
@@ -1048,13 +1048,13 @@ public class LeafQueue extends AbstractCSQueue {
setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
- getHeadroom(queueUser, currentResourceLimits.getLimit(),
+ getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
- " queueMaxAvailRes=" + currentResourceLimits.getLimit() +
+ " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
}
@@ -1169,7 +1169,7 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected synchronized boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
- boolean checkReservations, String nodePartition) {
+ String nodePartition, ResourceLimits currentResoureLimits) {
User user = getUser(userName);
// Note: We aren't considering the current request since there is a fixed
@@ -1180,13 +1180,13 @@ public class LeafQueue extends AbstractCSQueue {
limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking && checkReservations
- && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
+ if (this.reservationsContinueLooking &&
+ nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
- Resources.subtract(user.getUsed(),
- application.getCurrentReservation()), limit)) {
+ Resources.subtract(user.getUsed(),application.getCurrentReservation()),
+ limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
@@ -1194,6 +1194,13 @@ public class LeafQueue extends AbstractCSQueue {
+ user.getUsed() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit);
}
+ Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit);
+ // we can only acquire a new container if we unreserve first since we ignored the
+ // user limit. Choose the max of user limit or what was previously set by max
+ // capacity.
+ currentResoureLimits.setAmountNeededUnreserve(
+ Resources.max(resourceCalculator, clusterResource,
+ currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve));
return true;
}
}
@@ -1240,7 +1247,8 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, SchedulingMode schedulingMode) {
+ RMContainer reservedContainer, SchedulingMode schedulingMode,
+ ResourceLimits currentResoureLimits) {
CSAssignment assigned;
@@ -1254,7 +1262,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) {
@@ -1283,7 +1291,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned.getResource(), Resources.none())) {
@@ -1312,7 +1320,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
// update locality statistics
if (allocatedContainer.getValue() != null) {
@@ -1324,19 +1332,11 @@ public class LeafQueue extends AbstractCSQueue {
return SKIP_ASSIGNMENT;
}
-
- private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
- // First we need to get minimum resource we need unreserve
- // minimum-resource-need-unreserve = used + asked - limit
- return Resources.subtract(
- Resources.add(queueUsage.getUsed(), askedResource),
- currentResourceLimits.getLimit());
- }
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- Resource askedResource, Resource minimumUnreservedResource) {
+ Resource minimumUnreservedResource) {
// need to unreserve some other container first
NodeId idToUnreserve =
application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
@@ -1357,7 +1357,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
- + node.getNodeID() + " needing: " + askedResource);
+ + node.getNodeID() + " needing: " + minimumUnreservedResource);
}
// headroom
@@ -1376,47 +1376,16 @@ public class LeafQueue extends AbstractCSQueue {
return true;
}
- @Private
- protected boolean checkLimitsToReserve(Resource clusterResource,
- FiCaSchedulerApp application, Resource capability, String nodePartition,
- SchedulingMode schedulingMode) {
- // we can't reserve if we got here based on the limit
- // checks assuming we could unreserve!!!
- Resource userLimit = computeUserLimitAndSetHeadroom(application,
- clusterResource, capability, nodePartition, schedulingMode);
-
- // Check queue max-capacity limit,
- // TODO: Consider reservation on labels
- if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL,
- this.currentResourceLimits, capability, Resources.none(), schedulingMode)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("was going to reserve but hit queue limit");
- }
- return false;
- }
-
- // Check user limit
- if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
- application, false, nodePartition)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("was going to reserve but hit user limit");
- }
- return false;
- }
- return true;
- }
-
-
private CSAssignment assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode) {
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
@@ -1426,12 +1395,12 @@ public class LeafQueue extends AbstractCSQueue {
ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode) {
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
@@ -1441,12 +1410,12 @@ public class LeafQueue extends AbstractCSQueue {
ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode) {
+ SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- allocatedContainer, schedulingMode);
+ allocatedContainer, schedulingMode, currentResoureLimits);
}
return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
@@ -1529,7 +1498,8 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
- MutableObject createdContainer, SchedulingMode schedulingMode) {
+ MutableObject createdContainer, SchedulingMode schedulingMode,
+ ResourceLimits currentResoureLimits) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
@@ -1573,13 +1543,17 @@ public class LeafQueue extends AbstractCSQueue {
LOG.warn("Couldn't get container for allocation!");
return new CSAssignment(Resources.none(), type);
}
-
+
boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
application, priority, capability);
// Can we allocate a container on this node?
int availableContainers =
resourceCalculator.computeAvailableContainers(available, capability);
+
+ boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
+ currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
+
if (availableContainers > 0) {
// Allocate...
@@ -1588,20 +1562,24 @@ public class LeafQueue extends AbstractCSQueue {
unreserve(application, priority, node, rmContainer);
} else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
// when reservationsContinueLooking is set, we may need to unreserve
- // some containers to meet this queue and its parents' resource limits
+ // some containers to meet this queue, its parents', or the users' resource limits.
// TODO, need change here when we want to support continuous reservation
// looking for labeled partitions.
- Resource minimumUnreservedResource =
- getMinimumResourceNeedUnreserved(capability);
- if (!shouldAllocOrReserveNewContainer
- || Resources.greaterThan(resourceCalculator, clusterResource,
- minimumUnreservedResource, Resources.none())) {
+ if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
+ // If we shouldn't allocate/reserve new container then we should unreserve one the same
+ // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
+ // could be zero. If the limit was hit then use the amount we need to unreserve to be
+ // under the limit.
+ Resource amountToUnreserve = capability;
+ if (needToUnreserve) {
+ amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
+ }
boolean containerUnreserved =
findNodeToUnreserve(clusterResource, node, application, priority,
- capability, minimumUnreservedResource);
+ amountToUnreserve);
// When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
// container (That means we *have to* unreserve some resource to
- // continue)). If we failed to unreserve some resource,
+ // continue)). If we failed to unreserve some resource, we can't continue.
if (!containerUnreserved) {
return new CSAssignment(Resources.none(), type);
}
@@ -1642,13 +1620,13 @@ public class LeafQueue extends AbstractCSQueue {
if (shouldAllocOrReserveNewContainer || rmContainer != null) {
if (reservationsContinueLooking && rmContainer == null) {
- // we could possibly ignoring parent queue capacity limits when
- // reservationsContinueLooking is set.
- // If we're trying to reserve a container here, not container will be
- // unreserved for reserving the new one. Check limits again before
- // reserve the new container
- if (!checkLimitsToReserve(clusterResource,
- application, capability, node.getPartition(), schedulingMode)) {
+ // we could possibly ignoring queue capacity or user limits when
+ // reservationsContinueLooking is set. Make sure we didn't need to unreserve
+ // one.
+ if (needToUnreserve) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("we needed to unreserve to be able to allocate");
+ }
return new CSAssignment(Resources.none(), type);
}
}
@@ -1811,14 +1789,14 @@ public class LeafQueue extends AbstractCSQueue {
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
- this.currentResourceLimits = currentResourceLimits;
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
- this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+ this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/1cd2fcf2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index fc546ee..44845cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -748,14 +748,14 @@ public class TestReservations {
// nothing reserved
boolean res = a.findNodeToUnreserve(csContext.getClusterResource(),
- node_1, app_0, priorityMap, capability, capability);
+ node_1, app_0, priorityMap, capability);
assertFalse(res);
// reserved but scheduler doesn't know about that node.
app_0.reserve(node_1, priorityMap, rmContainer, container);
node_1.reserveResource(app_0, priorityMap, rmContainer);
res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0,
- priorityMap, capability, capability);
+ priorityMap, capability);
assertFalse(res);
}
@@ -858,12 +858,13 @@ public class TestReservations {
// allocate to queue so that the potential new capacity is greater then
// absoluteMaxCapacity
Resource capability = Resources.createResource(32 * GB, 0);
+ ResourceLimits limits = new ResourceLimits(clusterResource);
boolean res =
a.canAssignToThisQueue(clusterResource,
- RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
- clusterResource), capability, Resources.none(),
+ RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
+ assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
// now add in reservations and make sure it continues if config set
// allocate to queue so that the potential new capacity is greater then
@@ -880,38 +881,43 @@ public class TestReservations {
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
capability = Resources.createResource(5 * GB, 0);
+ limits = new ResourceLimits(clusterResource);
res =
a.canAssignToThisQueue(clusterResource,
- RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
- clusterResource), capability, Resources.createResource(5 * GB),
+ RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertTrue(res);
+ // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to
+ // unreserve 2GB to get the total 5GB needed.
+ // also note vcore checks not enabled
+ assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve());
// tell to not check reservations
+ limits = new ResourceLimits(clusterResource);
res =
a.canAssignToThisQueue(clusterResource,
- RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
- clusterResource), capability, Resources.none(),
+ RMNodeLabelsManager.NO_LABEL,limits, capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
+ assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
refreshQueuesTurnOffReservationsContLook(a, csConf);
- // should return false no matter what checkReservations is passed
- // in since feature is off
+ // should return false since reservations continue look is off.
+ limits = new ResourceLimits(clusterResource);
res =
a.canAssignToThisQueue(clusterResource,
- RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
- clusterResource), capability, Resources.none(),
+ RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
-
+ assertEquals(limits.getAmountNeededUnreserve(), Resources.none());
+ limits = new ResourceLimits(clusterResource);
res =
a.canAssignToThisQueue(clusterResource,
- RMNodeLabelsManager.NO_LABEL, new ResourceLimits(
- clusterResource), capability, Resources.createResource(5 * GB),
+ RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
assertFalse(res);
+ assertEquals(Resources.none(), limits.getAmountNeededUnreserve());
}
public void refreshQueuesTurnOffReservationsContLook(LeafQueue a,
@@ -1059,22 +1065,33 @@ public class TestReservations {
assertEquals(5 * GB, node_0.getUsedResource().getMemory());
assertEquals(3 * GB, node_1.getUsedResource().getMemory());
- // set limit so subtrace reservations it can continue
- Resource limit = Resources.createResource(12 * GB, 0);
- boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
- true, "");
+ // not over the limit
+ Resource limit = Resources.createResource(14 * GB, 0);
+ ResourceLimits userResourceLimits = new ResourceLimits(clusterResource);
+ boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
assertTrue(res);
+ assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
- // tell it not to check for reservations and should fail as already over
- // limit
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, "");
- assertFalse(res);
+
+ // set limit so it subtracts reservations and it can continue
+ limit = Resources.createResource(12 * GB, 0);
+ userResourceLimits = new ResourceLimits(clusterResource);
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
+ "", userResourceLimits);
+ assertTrue(res);
+ // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit
+ // we need to unreserve 1GB
+ // also note vcore checks not enabled
+ assertEquals(Resources.createResource(1 * GB, 4),
+ userResourceLimits.getAmountNeededUnreserve());
refreshQueuesTurnOffReservationsContLook(a, csConf);
+ userResourceLimits = new ResourceLimits(clusterResource);
// should now return false since feature off
- res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, "");
+ res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits);
assertFalse(res);
+ assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve());
}
@Test