You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cm...@apache.org on 2015/03/17 21:26:03 UTC
[46/50] [abbrv] hadoop git commit: YARN-3243. CapacityScheduler
should pass headroom from parent to children to make sure ParentQueue obey
its capacity limits. Contributed by Wangda Tan.
YARN-3243. CapacityScheduler should pass headroom from parent to children to make sure ParentQueue obey its capacity limits. Contributed by Wangda Tan.
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/487374b7
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/487374b7
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/487374b7
Branch: refs/heads/HDFS-7836
Commit: 487374b7fe0c92fc7eb1406c568952722b5d5b15
Parents: a89b087
Author: Jian He <ji...@apache.org>
Authored: Tue Mar 17 10:22:15 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Tue Mar 17 10:24:23 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../scheduler/capacity/AbstractCSQueue.java | 112 ++++++-
.../scheduler/capacity/CSQueue.java | 4 +-
.../scheduler/capacity/CapacityScheduler.java | 33 ++-
.../scheduler/capacity/LeafQueue.java | 292 +++++++------------
.../scheduler/capacity/ParentQueue.java | 140 +++------
.../scheduler/common/fica/FiCaSchedulerApp.java | 16 +-
.../capacity/TestApplicationLimits.java | 8 +-
.../capacity/TestCapacityScheduler.java | 59 ++++
.../scheduler/capacity/TestChildQueueOrder.java | 25 +-
.../scheduler/capacity/TestLeafQueue.java | 142 ++++-----
.../scheduler/capacity/TestParentQueue.java | 97 +++---
.../scheduler/capacity/TestReservations.java | 147 +++++-----
13 files changed, 561 insertions(+), 517 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 82934ad..f5b72d7 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -56,6 +56,9 @@ Release 2.8.0 - UNRELEASED
IMPROVEMENTS
+ YARN-3243. CapacityScheduler should pass headroom from parent to children
+ to make sure ParentQueue obey its capacity limits. (Wangda Tan via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index d800709..4e53060 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -20,10 +20,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -34,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
@@ -49,6 +53,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
public abstract class AbstractCSQueue implements CSQueue {
+ private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
CSQueue parent;
final String queueName;
@@ -406,21 +411,102 @@ public abstract class AbstractCSQueue implements CSQueue {
parentQ.getPreemptionDisabled());
}
- protected Resource getCurrentResourceLimit(Resource clusterResource,
- ResourceLimits currentResourceLimits) {
+ private Resource getCurrentLimitResource(String nodeLabel,
+ Resource clusterResource, ResourceLimits currentResourceLimits) {
/*
- * Queue's max available resource = min(my.max, my.limit)
- * my.limit is set by my parent, considered used resource of my siblings
+ * Current limit resource: For labeled resource: limit = queue-max-resource
+ * (TODO, this part need update when we support labeled-limit) For
+ * non-labeled resource: limit = min(queue-max-resource,
+ * limit-set-by-parent)
*/
Resource queueMaxResource =
- Resources.multiplyAndNormalizeDown(resourceCalculator, clusterResource,
- queueCapacities.getAbsoluteMaximumCapacity(), minimumAllocation);
- Resource queueCurrentResourceLimit =
- Resources.min(resourceCalculator, clusterResource, queueMaxResource,
- currentResourceLimits.getLimit());
- queueCurrentResourceLimit =
- Resources.roundDown(resourceCalculator, queueCurrentResourceLimit,
- minimumAllocation);
- return queueCurrentResourceLimit;
+ Resources.multiplyAndNormalizeDown(resourceCalculator,
+ labelManager.getResourceByLabel(nodeLabel, clusterResource),
+ queueCapacities.getAbsoluteMaximumCapacity(nodeLabel), minimumAllocation);
+ if (nodeLabel.equals(RMNodeLabelsManager.NO_LABEL)) {
+ return Resources.min(resourceCalculator, clusterResource,
+ queueMaxResource, currentResourceLimits.getLimit());
+ }
+ return queueMaxResource;
+ }
+
+ synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ Set<String> nodeLabels, ResourceLimits currentResourceLimits,
+ Resource nowRequired, Resource resourceCouldBeUnreserved) {
+ // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+ Set<String> labelCanAccess;
+ if (null == nodeLabels || nodeLabels.isEmpty()) {
+ labelCanAccess = new HashSet<String>();
+ // Any queue can always access any node without label
+ labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+ } else {
+ labelCanAccess = new HashSet<String>(
+ accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+ : Sets.intersection(accessibleLabels, nodeLabels));
+ }
+
+ for (String label : labelCanAccess) {
+ // New total resource = used + required
+ Resource newTotalResource =
+ Resources.add(queueUsage.getUsed(label), nowRequired);
+
+ Resource currentLimitResource =
+ getCurrentLimitResource(label, clusterResource, currentResourceLimits);
+
+ // if reservation continous looking enabled, check to see if could we
+ // potentially use this node instead of a reserved node if the application
+ // has reserved containers.
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking
+ && label.equals(RMNodeLabelsManager.NO_LABEL)
+ && Resources.greaterThan(resourceCalculator, clusterResource,
+ resourceCouldBeUnreserved, Resources.none())) {
+ // resource-without-reserved = used - reserved
+ Resource newTotalWithoutReservedResource =
+ Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+
+ // when total-used-without-reserved-resource < currentLimit, we still
+ // have chance to allocate on this node by unreserving some containers
+ if (Resources.lessThan(resourceCalculator, clusterResource,
+ newTotalWithoutReservedResource, currentLimitResource)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("try to use reserved: " + getQueueName()
+ + " usedResources: " + queueUsage.getUsed()
+ + ", clusterResources: " + clusterResource
+ + ", reservedResources: " + resourceCouldBeUnreserved
+ + ", capacity-without-reserved: "
+ + newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ + currentLimitResource);
+ }
+ return true;
+ }
+ }
+
+ // Otherwise, if any of the label of this node beyond queue limit, we
+ // cannot allocate on this node. Consider a small epsilon here.
+ if (Resources.greaterThan(resourceCalculator, clusterResource,
+ newTotalResource, currentLimitResource)) {
+ return false;
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName()
+ + "Check assign to queue, label=" + label
+ + " usedResources: " + queueUsage.getUsed(label)
+ + " clusterResources: " + clusterResource
+ + " currentUsedCapacity "
+ + Resources.divide(resourceCalculator, clusterResource,
+ queueUsage.getUsed(label),
+ labelManager.getResourceByLabel(label, clusterResource))
+ + " max-capacity: "
+ + queueCapacities.getAbsoluteMaximumCapacity(label)
+ + ")");
+ }
+ return true;
+ }
+
+ // Actually, this will not happen, since labelCanAccess will be always
+ // non-empty
+ return false;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 0a60acc..1a9448a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -189,13 +189,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* Assign containers to applications in the queue or it's children (if any).
* @param clusterResource the resource of the cluster.
* @param node node on which resources are available
- * @param needToUnreserve assign container only if it can unreserve one first
* @param resourceLimits how much overall resource of this queue can use.
* @return the assignment
*/
public CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits resourceLimits);
+ FiCaSchedulerNode node, ResourceLimits resourceLimits);
/**
* A container assigned to the queue has completed.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 756e537..c86c0ff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1061,9 +1061,14 @@ public class CapacityScheduler extends
node.getNodeID());
LeafQueue queue = ((LeafQueue)reservedApplication.getQueue());
- CSAssignment assignment = queue.assignContainers(clusterResource, node,
- false, new ResourceLimits(
- clusterResource));
+ CSAssignment assignment =
+ queue.assignContainers(
+ clusterResource,
+ node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager.getResourceByLabel(
+ RMNodeLabelsManager.NO_LABEL, clusterResource)));
RMContainer excessReservation = assignment.getExcessReservation();
if (excessReservation != null) {
@@ -1087,8 +1092,13 @@ public class CapacityScheduler extends
LOG.debug("Trying to schedule on node: " + node.getNodeName() +
", available: " + node.getAvailableResource());
}
- root.assignContainers(clusterResource, node, false, new ResourceLimits(
- clusterResource));
+ root.assignContainers(
+ clusterResource,
+ node,
+ // TODO, now we only consider limits for parent for non-labeled
+ // resources, should consider labeled resources as well.
+ new ResourceLimits(labelManager.getResourceByLabel(
+ RMNodeLabelsManager.NO_LABEL, clusterResource)));
}
} else {
LOG.info("Skipping scheduling since node " + node.getNodeID() +
@@ -1209,6 +1219,13 @@ public class CapacityScheduler extends
usePortForNodeName, nodeManager.getNodeLabels());
this.nodes.put(nodeManager.getNodeID(), schedulerNode);
Resources.addTo(clusterResource, nodeManager.getTotalCapability());
+
+ // update this node to node label manager
+ if (labelManager != null) {
+ labelManager.activateNode(nodeManager.getNodeID(),
+ nodeManager.getTotalCapability());
+ }
+
root.updateClusterResource(clusterResource, new ResourceLimits(
clusterResource));
int numNodes = numNodeManagers.incrementAndGet();
@@ -1220,12 +1237,6 @@ public class CapacityScheduler extends
if (scheduleAsynchronously && numNodes == 1) {
asyncSchedulerThread.beginSchedule();
}
-
- // update this node to node label manager
- if (labelManager != null) {
- labelManager.activateNode(nodeManager.getNodeID(),
- nodeManager.getTotalCapability());
- }
}
private synchronized void removeNode(RMNode nodeInfo) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index a607a62..dd6a894 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -76,7 +76,6 @@ import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.collect.Sets;
@Private
@Unstable
@@ -157,7 +156,7 @@ public class LeafQueue extends AbstractCSQueue {
// and all queues may not be realized yet, we'll use (optimistic)
// absoluteMaxCapacity (it will be replaced with the more accurate
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
userLimit = conf.getUserLimit(getQueuePath());
@@ -739,9 +738,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits currentResourceLimits) {
- this.currentResourceLimits = currentResourceLimits;
+ FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
+ updateCurrentResourceLimits(currentResourceLimits, clusterResource);
if(LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
@@ -796,7 +794,7 @@ public class LeafQueue extends AbstractCSQueue {
continue;
}
if (!this.reservationsContinueLooking) {
- if (!needContainers(application, priority, required)) {
+ if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
if (LOG.isDebugEnabled()) {
LOG.debug("doesn't need containers based on reservation algo!");
}
@@ -818,8 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
required, requestedNodeLabels);
// Check queue max-capacity limit
- if (!canAssignToThisQueue(clusterResource, required,
- node.getLabels(), application, true)) {
+ if (!super.canAssignToThisQueue(clusterResource, node.getLabels(),
+ this.currentResourceLimits, required, application.getCurrentReservation())) {
return NULL_ASSIGNMENT;
}
@@ -835,7 +833,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to schedule
CSAssignment assignment =
assignContainersOnNode(clusterResource, node, application, priority,
- null, needToUnreserve);
+ null);
// Did the application skip this node?
if (assignment.getSkipped()) {
@@ -896,7 +894,7 @@ public class LeafQueue extends AbstractCSQueue {
// Try to assign if we have sufficient resources
assignContainersOnNode(clusterResource, node, application, priority,
- rmContainer, false);
+ rmContainer);
// Doesn't matter... since it's already charged for at time of reservation
// "re-reservation" is *free*
@@ -938,102 +936,14 @@ public class LeafQueue extends AbstractCSQueue {
Resources.roundDown(resourceCalculator, headroom, minimumAllocation);
return headroom;
}
-
- synchronized boolean canAssignToThisQueue(Resource clusterResource,
- Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
- boolean checkReservations) {
- // Get label of this queue can access, it's (nodeLabel AND queueLabel)
- Set<String> labelCanAccess;
- if (null == nodeLabels || nodeLabels.isEmpty()) {
- labelCanAccess = new HashSet<String>();
- // Any queue can always access any node without label
- labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
- } else {
- labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
- }
-
- boolean canAssign = true;
- for (String label : labelCanAccess) {
- Resource potentialTotalCapacity =
- Resources.add(queueUsage.getUsed(label), required);
-
- float potentialNewCapacity =
- Resources.divide(resourceCalculator, clusterResource,
- potentialTotalCapacity,
- labelManager.getResourceByLabel(label, clusterResource));
- // if enabled, check to see if could we potentially use this node instead
- // of a reserved node if the application has reserved containers
- // TODO, now only consider reservation cases when the node has no label
- if (this.reservationsContinueLooking && checkReservations
- && label.equals(RMNodeLabelsManager.NO_LABEL)) {
- float potentialNewWithoutReservedCapacity = Resources.divide(
- resourceCalculator,
- clusterResource,
- Resources.subtract(potentialTotalCapacity,
- application.getCurrentReservation()),
- labelManager.getResourceByLabel(label, clusterResource));
-
- if (potentialNewWithoutReservedCapacity <= queueCapacities
- .getAbsoluteMaximumCapacity()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("try to use reserved: "
- + getQueueName()
- + " usedResources: "
- + queueUsage.getUsed()
- + " clusterResources: "
- + clusterResource
- + " reservedResources: "
- + application.getCurrentReservation()
- + " currentCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(), clusterResource) + " required " + required
- + " potentialNewWithoutReservedCapacity: "
- + potentialNewWithoutReservedCapacity + " ( "
- + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity() + ")");
- }
- // we could potentially use this node instead of reserved node
- return true;
- }
- }
-
- // Otherwise, if any of the label of this node beyond queue limit, we
- // cannot allocate on this node. Consider a small epsilon here.
- if (potentialNewCapacity > queueCapacities
- .getAbsoluteMaximumCapacity(label) + 1e-4) {
- canAssign = false;
- break;
- }
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName()
- + "Check assign to queue, label=" + label
- + " usedResources: " + queueUsage.getUsed(label)
- + " clusterResources: " + clusterResource
- + " currentCapacity "
- + Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(label),
- labelManager.getResourceByLabel(label, clusterResource))
- + " potentialNewCapacity: " + potentialNewCapacity + " ( "
- + " max-capacity: " + queueCapacities.getAbsoluteMaximumCapacity()
- + ")");
- }
- }
-
- return canAssign;
- }
- private Resource computeQueueCurrentLimitAndSetHeadroomInfo(
+ private void setQueueResourceLimitsInfo(
Resource clusterResource) {
- Resource queueCurrentResourceLimit =
- getCurrentResourceLimit(clusterResource, currentResourceLimits);
-
synchronized (queueResourceLimitsInfo) {
- queueResourceLimitsInfo.setQueueCurrentLimit(queueCurrentResourceLimit);
+ queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits
+ .getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
-
- return queueCurrentResourceLimit;
}
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
@@ -1048,16 +958,16 @@ public class LeafQueue extends AbstractCSQueue {
computeUserLimit(application, clusterResource, required,
queueUser, requestedLabels);
- Resource currentResourceLimit =
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
Resource headroom =
- getHeadroom(queueUser, currentResourceLimit, clusterResource, userLimit);
+ getHeadroom(queueUser, currentResourceLimits.getLimit(),
+ clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
- " queueMaxAvailRes=" + currentResourceLimit +
+ " queueMaxAvailRes=" + currentResourceLimits.getLimit() +
" consumed=" + queueUser.getUsed() +
" headroom=" + headroom);
}
@@ -1207,8 +1117,8 @@ public class LeafQueue extends AbstractCSQueue {
return true;
}
- boolean needContainers(FiCaSchedulerApp application, Priority priority,
- Resource required) {
+ boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
+ Priority priority, Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@@ -1240,7 +1150,7 @@ public class LeafQueue extends AbstractCSQueue {
private CSAssignment assignContainersOnNode(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve) {
+ RMContainer reservedContainer) {
Resource assigned = Resources.none();
NodeType requestType = null;
@@ -1252,7 +1162,7 @@ public class LeafQueue extends AbstractCSQueue {
requestType = NodeType.NODE_LOCAL;
assigned =
assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@@ -1280,7 +1190,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
if (Resources.greaterThan(resourceCalculator, clusterResource,
assigned, Resources.none())) {
@@ -1308,7 +1218,7 @@ public class LeafQueue extends AbstractCSQueue {
assigned =
assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
- node, application, priority, reservedContainer, needToUnreserve,
+ node, application, priority, reservedContainer,
allocatedContainer);
// update locality statistics
@@ -1320,13 +1230,24 @@ public class LeafQueue extends AbstractCSQueue {
return SKIP_ASSIGNMENT;
}
+
+ private Resource getMinimumResourceNeedUnreserved(Resource askedResource) {
+ // First we need to get minimum resource we need unreserve
+ // minimum-resource-need-unreserve = used + asked - limit
+ Resource minimumUnreservedResource =
+ Resources.subtract(Resources.add(queueUsage.getUsed(), askedResource),
+ currentResourceLimits.getLimit());
+ return minimumUnreservedResource;
+ }
@Private
protected boolean findNodeToUnreserve(Resource clusterResource,
FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- Resource capability) {
+ Resource askedResource, Resource minimumUnreservedResource) {
// need to unreserve some other container first
- NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, capability);
+ NodeId idToUnreserve =
+ application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
+ resourceCalculator, clusterResource);
if (idToUnreserve == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("checked to see if could unreserve for app but nothing "
@@ -1343,7 +1264,7 @@ public class LeafQueue extends AbstractCSQueue {
LOG.debug("unreserving for app: " + application.getApplicationId()
+ " on nodeId: " + idToUnreserve
+ " in order to replace reserved application and place it on node: "
- + node.getNodeID() + " needing: " + capability);
+ + node.getNodeID() + " needing: " + askedResource);
}
// headroom
@@ -1364,15 +1285,7 @@ public class LeafQueue extends AbstractCSQueue {
@Private
protected boolean checkLimitsToReserve(Resource clusterResource,
- FiCaSchedulerApp application, Resource capability,
- boolean needToUnreserve) {
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate");
- }
- return false;
- }
-
+ FiCaSchedulerApp application, Resource capability) {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
@@ -1380,7 +1293,8 @@ public class LeafQueue extends AbstractCSQueue {
// Check queue max-capacity limit,
// TODO: Consider reservation on labels
- if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
+ if (!canAssignToThisQueue(clusterResource, null,
+ this.currentResourceLimits, capability, Resources.none())) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@@ -1402,43 +1316,40 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignNodeLocalContainers(Resource clusterResource,
ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
}
- private Resource assignRackLocalContainers(
- Resource clusterResource, ResourceRequest rackLocalResourceRequest,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ private Resource assignRackLocalContainers(Resource clusterResource,
+ ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
}
- private Resource assignOffSwitchContainers(
- Resource clusterResource, ResourceRequest offSwitchResourceRequest,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, boolean needToUnreserve,
- MutableObject allocatedContainer) {
+ private Resource assignOffSwitchContainers(Resource clusterResource,
+ ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
+ FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, MutableObject allocatedContainer) {
if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
reservedContainer)) {
return assignContainer(clusterResource, node, application, priority,
offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- needToUnreserve, allocatedContainer);
+ allocatedContainer);
}
return Resources.none();
@@ -1522,13 +1433,12 @@ public class LeafQueue extends AbstractCSQueue {
private Resource assignContainer(Resource clusterResource, FiCaSchedulerNode node,
FiCaSchedulerApp application, Priority priority,
ResourceRequest request, NodeType type, RMContainer rmContainer,
- boolean needToUnreserve, MutableObject createdContainer) {
+ MutableObject createdContainer) {
if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
+ " application=" + application.getApplicationId()
+ " priority=" + priority.getPriority()
- + " request=" + request + " type=" + type
- + " needToUnreserve= " + needToUnreserve);
+ + " request=" + request + " type=" + type);
}
// check if the resource request can access the label
@@ -1548,12 +1458,14 @@ public class LeafQueue extends AbstractCSQueue {
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
- if (!Resources.fitsIn(capability, totalResource)) {
+ if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
+ capability, totalResource)) {
LOG.warn("Node : " + node.getNodeID()
+ " does not have sufficient resource for request : " + request
+ " node total capability : " + node.getTotalResource());
return Resources.none();
}
+
assert Resources.greaterThan(
resourceCalculator, clusterResource, available, Resources.none());
@@ -1566,18 +1478,9 @@ public class LeafQueue extends AbstractCSQueue {
LOG.warn("Couldn't get container for allocation!");
return Resources.none();
}
-
- // default to true since if reservation continue look feature isn't on
- // needContainers is checked earlier and we wouldn't have gotten this far
- boolean canAllocContainer = true;
- if (this.reservationsContinueLooking) {
- // based on reservations can we allocate/reserve more or do we need
- // to unreserve one first
- canAllocContainer = needContainers(application, priority, capability);
- if (LOG.isDebugEnabled()) {
- LOG.debug("can alloc container is: " + canAllocContainer);
- }
- }
+
+ boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
+ application, priority, capability);
// Can we allocate a container on this node?
int availableContainers =
@@ -1588,25 +1491,25 @@ public class LeafQueue extends AbstractCSQueue {
// Did we previously reserve containers at this 'priority'?
if (rmContainer != null) {
unreserve(application, priority, node, rmContainer);
- } else if (this.reservationsContinueLooking
- && (!canAllocContainer || needToUnreserve)) {
- // need to unreserve some other container first
- boolean res = findNodeToUnreserve(clusterResource, node, application,
- priority, capability);
- if (!res) {
- return Resources.none();
- }
- } else {
- // we got here by possibly ignoring queue capacity limits. If the
- // parameter needToUnreserve is true it means we ignored one of those
- // limits in the chance we could unreserve. If we are here we aren't
- // trying to unreserve so we can't allocate anymore due to that parent
- // limit.
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate, skipping");
+ } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
+ // when reservationsContinueLooking is set, we may need to unreserve
+ // some containers to meet this queue and its parents' resource limits
+ // TODO, need change here when we want to support continuous reservation
+ // looking for labeled partitions.
+ Resource minimumUnreservedResource =
+ getMinimumResourceNeedUnreserved(capability);
+ if (!shouldAllocOrReserveNewContainer
+ || Resources.greaterThan(resourceCalculator, clusterResource,
+ minimumUnreservedResource, Resources.none())) {
+ boolean containerUnreserved =
+ findNodeToUnreserve(clusterResource, node, application, priority,
+ capability, minimumUnreservedResource);
+ // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
+ // container (That means we *have to* unreserve some resource to
+ // continue)). If we failed to unreserve some resource,
+ if (!containerUnreserved) {
+ return Resources.none();
}
- return Resources.none();
}
}
@@ -1632,17 +1535,16 @@ public class LeafQueue extends AbstractCSQueue {
} else {
// if we are allowed to allocate but this node doesn't have space, reserve it or
// if this was an already a reserved container, reserve it again
- if ((canAllocContainer) || (rmContainer != null)) {
-
- if (reservationsContinueLooking) {
- // we got here by possibly ignoring parent queue capacity limits. If
- // the parameter needToUnreserve is true it means we ignored one of
- // those limits in the chance we could unreserve. If we are here
- // we aren't trying to unreserve so we can't allocate
- // anymore due to that parent limit
- boolean res = checkLimitsToReserve(clusterResource, application, capability,
- needToUnreserve);
- if (!res) {
+ if (shouldAllocOrReserveNewContainer || rmContainer != null) {
+
+ if (reservationsContinueLooking && rmContainer == null) {
+ // we could possibly ignoring parent queue capacity limits when
+ // reservationsContinueLooking is set.
+ // If we're trying to reserve a container here, not container will be
+ // unreserved for reserving the new one. Check limits again before
+ // reserve the new container
+ if (!checkLimitsToReserve(clusterResource,
+ application, capability)) {
return Resources.none();
}
}
@@ -1784,18 +1686,36 @@ public class LeafQueue extends AbstractCSQueue {
Resources.multiplyAndNormalizeUp(resourceCalculator, clusterResource,
queueCapacities.getAbsoluteCapacity(), minimumAllocation);
}
+
+ private void updateCurrentResourceLimits(
+ ResourceLimits currentResourceLimits, Resource clusterResource) {
+ // TODO: need consider non-empty node labels when resource limits supports
+ // node labels
+ // Even if ParentQueue will set limits respect child's max queue capacity,
+ // but when allocating reserved container, CapacityScheduler doesn't do
+ // this. So need cap limits by queue's max capacity here.
+ this.currentResourceLimits = currentResourceLimits;
+ Resource queueMaxResource =
+ Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+ queueCapacities
+ .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
+ minimumAllocation);
+ this.currentResourceLimits.setLimit(Resources.min(resourceCalculator,
+ clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+ }
@Override
public synchronized void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
- this.currentResourceLimits = currentResourceLimits;
+ updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
updateAbsoluteCapacityResource(clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation
- computeQueueCurrentLimitAndSetHeadroomInfo(clusterResource);
+ setQueueResourceLimitsInfo(clusterResource);
// Update metrics
CSQueueUtils.updateQueueStatistics(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 7feaa15..5ed6bb8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -48,7 +47,6 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
@@ -63,8 +61,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.util.resource.Resources;
-import com.google.common.collect.Sets;
-
@Private
@Evolving
public class ParentQueue extends AbstractCSQueue {
@@ -380,8 +376,7 @@ public class ParentQueue extends AbstractCSQueue {
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
- FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits resourceLimits) {
+ FiCaSchedulerNode node, ResourceLimits resourceLimits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
Set<String> nodeLabels = node.getLabels();
@@ -397,21 +392,18 @@ public class ParentQueue extends AbstractCSQueue {
+ getQueueName());
}
- boolean localNeedToUnreserve = false;
-
// Are we over maximum-capacity for this queue?
- if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
- // check to see if we could if we unreserve first
- localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
- if (!localNeedToUnreserve) {
- break;
- }
+ // This will also consider parent's limits and also continuous reservation
+ // looking
+ if (!super.canAssignToThisQueue(clusterResource, nodeLabels, resourceLimits,
+ minimumAllocation, Resources.createResource(getMetrics()
+ .getReservedMB(), getMetrics().getReservedVirtualCores()))) {
+ break;
}
// Schedule
CSAssignment assignedToChild =
- assignContainersToChildQueues(clusterResource, node,
- localNeedToUnreserve | needToUnreserve, resourceLimits);
+ assignContainersToChildQueues(clusterResource, node, resourceLimits);
assignment.setType(assignedToChild.getType());
// Done if no child-queue assigned anything
@@ -459,74 +451,6 @@ public class ParentQueue extends AbstractCSQueue {
return assignment;
}
- private synchronized boolean canAssignToThisQueue(Resource clusterResource,
- Set<String> nodeLabels) {
- Set<String> labelCanAccess =
- new HashSet<String>(
- accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
- : Sets.intersection(accessibleLabels, nodeLabels));
- if (nodeLabels.isEmpty()) {
- // Any queue can always access any node without label
- labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
- }
-
- boolean canAssign = true;
- for (String label : labelCanAccess) {
- float currentAbsoluteLabelUsedCapacity =
- Resources.divide(resourceCalculator, clusterResource,
- queueUsage.getUsed(label),
- labelManager.getResourceByLabel(label, clusterResource));
- // if any of the label doesn't beyond limit, we can allocate on this node
- if (currentAbsoluteLabelUsedCapacity >=
- queueCapacities.getAbsoluteMaximumCapacity(label)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName() + " used=" + queueUsage.getUsed()
- + " current-capacity (" + queueUsage.getUsed(label) + ") "
- + " >= max-capacity ("
- + labelManager.getResourceByLabel(label, clusterResource) + ")");
- }
- canAssign = false;
- break;
- }
- }
-
- return canAssign;
- }
-
-
- private synchronized boolean assignToQueueIfUnreserve(Resource clusterResource) {
- if (this.reservationsContinueLooking) {
- // check to see if we could potentially use this node instead of a reserved
- // node
-
- Resource reservedResources = Resources.createResource(getMetrics()
- .getReservedMB(), getMetrics().getReservedVirtualCores());
- float capacityWithoutReservedCapacity = Resources.divide(
- resourceCalculator, clusterResource,
- Resources.subtract(queueUsage.getUsed(), reservedResources),
- clusterResource);
-
- if (capacityWithoutReservedCapacity <= queueCapacities
- .getAbsoluteMaximumCapacity()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("parent: try to use reserved: " + getQueueName()
- + " usedResources: " + queueUsage.getUsed().getMemory()
- + " clusterResources: " + clusterResource.getMemory()
- + " reservedResources: " + reservedResources.getMemory()
- + " currentCapacity " + ((float) queueUsage.getUsed().getMemory())
- / clusterResource.getMemory()
- + " potentialNewWithoutReservedCapacity: "
- + capacityWithoutReservedCapacity + " ( " + " max-capacity: "
- + queueCapacities.getAbsoluteMaximumCapacity() + ")");
- }
- // we could potentially use this node instead of reserved node
- return true;
- }
- }
- return false;
- }
-
-
private boolean canAssign(Resource clusterResource, FiCaSchedulerNode node) {
return (node.getReservedContainer() == null) &&
Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
@@ -534,28 +458,38 @@ public class ParentQueue extends AbstractCSQueue {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
- Resource clusterResource, ResourceLimits myLimits) {
- /*
- * Set head-room of a given child, limit =
- * min(minimum-of-limit-of-this-queue-and-ancestors, this.max) - this.used
- * + child.used. To avoid any of this queue's and its ancestors' limit
- * being violated
- */
- Resource myCurrentLimit =
- getCurrentResourceLimit(clusterResource, myLimits);
- // My available resource = my-current-limit - my-used-resource
- Resource myMaxAvailableResource = Resources.subtract(myCurrentLimit,
- getUsedResources());
- // Child's limit = my-available-resource + resource-already-used-by-child
+ Resource clusterResource, ResourceLimits parentLimits) {
+ // Set resource-limit of a given child, child.limit =
+ // min(my.limit - my.used + child.used, child.max)
+
+ // Parent available resource = parent-limit - parent-used-resource
+ Resource parentMaxAvailableResource =
+ Resources.subtract(parentLimits.getLimit(), getUsedResources());
+
+ // Child's limit = parent-available-resource + child-used
Resource childLimit =
- Resources.add(myMaxAvailableResource, child.getUsedResources());
-
+ Resources.add(parentMaxAvailableResource, child.getUsedResources());
+
+ // Get child's max resource
+ Resource childConfiguredMaxResource =
+ Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
+ .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
+ child.getAbsoluteMaximumCapacity(), minimumAllocation);
+
+ // Child's limit should be capped by child configured max resource
+ childLimit =
+ Resources.min(resourceCalculator, clusterResource, childLimit,
+ childConfiguredMaxResource);
+
+ // Normalize before return
+ childLimit =
+ Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
+
return new ResourceLimits(childLimit);
}
private synchronized CSAssignment assignContainersToChildQueues(
- Resource cluster, FiCaSchedulerNode node, boolean needToUnreserve,
- ResourceLimits limits) {
+ Resource cluster, FiCaSchedulerNode node, ResourceLimits limits) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -573,9 +507,7 @@ public class ParentQueue extends AbstractCSQueue {
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits);
- assignment =
- childQueue.assignContainers(cluster, node, needToUnreserve,
- childLimits);
+ assignment = childQueue.assignContainers(cluster, node, childLimits);
if(LOG.isDebugEnabled()) {
LOG.debug("Assigned to queue: " + childQueue.getQueuePath() +
" stats: " + childQueue + " --> " +
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 9f97b13..6cc2777 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -274,7 +274,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
synchronized public NodeId getNodeIdToUnreserve(Priority priority,
- Resource capability) {
+ Resource resourceNeedUnreserve, ResourceCalculator rc,
+ Resource clusterResource) {
// first go around make this algorithm simple and just grab first
// reservation that has enough resources
@@ -283,16 +284,19 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
if ((reservedContainers != null) && (!reservedContainers.isEmpty())) {
for (Map.Entry<NodeId, RMContainer> entry : reservedContainers.entrySet()) {
+ NodeId nodeId = entry.getKey();
+ Resource containerResource = entry.getValue().getContainer().getResource();
+
// make sure we unreserve one with at least the same amount of
// resources, otherwise could affect capacity limits
- if (Resources.fitsIn(capability, entry.getValue().getContainer()
- .getResource())) {
+ if (Resources.lessThanOrEqual(rc, clusterResource,
+ resourceNeedUnreserve, containerResource)) {
if (LOG.isDebugEnabled()) {
LOG.debug("unreserving node with reservation size: "
- + entry.getValue().getContainer().getResource()
- + " in order to allocate container with size: " + capability);
+ + containerResource
+ + " in order to allocate container with size: " + resourceNeedUnreserve);
}
- return entry.getKey();
+ return nodeId;
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 8cad057..1ca5c97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -611,7 +611,7 @@ public class TestApplicationLimits {
app_0_0.updateResourceRequests(app_0_0_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -631,7 +631,7 @@ public class TestApplicationLimits {
app_0_1.updateResourceRequests(app_0_1_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroom, app_0_1.getHeadroom());// no change
@@ -651,7 +651,7 @@ public class TestApplicationLimits {
app_1_0.updateResourceRequests(app_1_0_requests);
// Schedule to compute
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(10*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
@@ -660,7 +660,7 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
- queue.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource)); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index 83ab104..7a265dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -125,6 +125,7 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
+import org.junit.ComparisonFailure;
import org.junit.Test;
import org.mockito.Mockito;
@@ -2483,6 +2484,64 @@ public class TestCapacityScheduler {
Assert.assertEquals(30 * GB,
am1.doHeartbeat().getAvailableResources().getMemory());
}
+
+ @Test
+ public void testParentQueueMaxCapsAreRespected() throws Exception {
+ /*
+ * Queue tree:
+ * Root
+ * / \
+ * A B
+ * / \
+ * A1 A2
+ */
+ CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+ csConf.setCapacity(A, 50);
+ csConf.setMaximumCapacity(A, 50);
+ csConf.setCapacity(B, 50);
+
+ // Define 2nd-level queues
+ csConf.setQueues(A, new String[] {"a1", "a2"});
+ csConf.setCapacity(A1, 50);
+ csConf.setUserLimitFactor(A1, 100.0f);
+ csConf.setCapacity(A2, 50);
+ csConf.setUserLimitFactor(A2, 100.0f);
+ csConf.setCapacity(B1, B1_CAPACITY);
+ csConf.setUserLimitFactor(B1, 100.0f);
+
+ YarnConfiguration conf = new YarnConfiguration(csConf);
+ conf.setBoolean(CapacitySchedulerConfiguration.ENABLE_USER_METRICS, true);
+
+ MemoryRMStateStore memStore = new MemoryRMStateStore();
+ memStore.init(conf);
+ MockRM rm1 = new MockRM(conf, memStore);
+ rm1.start();
+ MockNM nm1 =
+ new MockNM("127.0.0.1:1234", 24 * GB, rm1.getResourceTrackerService());
+ nm1.registerNode();
+
+ // Launch app1 in a1, resource usage is 1GB (am) + 4GB * 2 = 9GB
+ RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a1");
+ MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+ waitContainerAllocated(am1, 4 * GB, 2, 2, rm1, nm1);
+
+ // Try to launch app2 in a2, asked 2GB, should success
+ RMApp app2 = rm1.submitApp(2 * GB, "app", "user", null, "a2");
+ MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
+ try {
+ // Try to allocate a container, a's usage=11G/max=12
+ // a1's usage=9G/max=12
+ // a2's usage=2G/max=12
+ // In this case, if a2 asked 2G, should fail.
+ waitContainerAllocated(am2, 2 * GB, 1, 2, rm1, nm1);
+ } catch (AssertionError failure) {
+ // Expected, return;
+ return;
+ }
+ Assert.fail("Shouldn't successfully allocate containers for am2, "
+ + "queue-a's max capacity will be violated if container allocated");
+ }
private void setMaxAllocMb(Configuration conf, int maxAllocMb) {
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/487374b7/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 7edb17d..71dc523 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doNothing;
@@ -145,7 +144,7 @@ public class TestChildQueueOrder {
if (allocation > 0) {
doReturn(new CSAssignment(Resources.none(), type)).
when(queue)
- .assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+ .assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
// Mock the node's resource availability
@@ -157,7 +156,7 @@ public class TestChildQueueOrder {
return new CSAssignment(allocatedResource, type);
}
}).
- when(queue).assignContainers(eq(clusterResource), eq(node), anyBoolean(),
+ when(queue).assignContainers(eq(clusterResource), eq(node),
any(ResourceLimits.class));
doNothing().when(node).releaseContainer(any(Container.class));
}
@@ -274,7 +273,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
for(int i=0; i < 2; i++)
{
@@ -282,7 +281,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 3; i++)
@@ -291,7 +290,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 1*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
for(int i=0; i < 4; i++)
@@ -300,7 +299,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 1*GB, clusterResource);
@@ -334,7 +333,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
}
verifyQueueMetrics(a, 3*GB, clusterResource);
@@ -362,7 +361,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 2*GB, clusterResource);
verifyQueueMetrics(b, 3*GB, clusterResource);
@@ -389,7 +388,7 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 0*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 0*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
@@ -404,13 +403,13 @@ public class TestChildQueueOrder {
stubQueueAllocation(b, clusterResource, node_0, 1*GB);
stubQueueAllocation(c, clusterResource, node_0, 0*GB);
stubQueueAllocation(d, clusterResource, node_0, 1*GB);
- root.assignContainers(clusterResource, node_0, false, new ResourceLimits(
+ root.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource));
InOrder allocationOrder = inOrder(d,b);
allocationOrder.verify(d).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class));
allocationOrder.verify(b).assignContainers(eq(clusterResource),
- any(FiCaSchedulerNode.class), anyBoolean(), any(ResourceLimits.class));
+ any(FiCaSchedulerNode.class), any(ResourceLimits.class));
verifyQueueMetrics(a, 3*GB, clusterResource);
verifyQueueMetrics(b, 2*GB, clusterResource);
verifyQueueMetrics(c, 3*GB, clusterResource);