You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/16 03:33:56 UTC
[3/4] YARN-2496. Enhanced Capacity Scheduler to have basic support
for allocating resources based on node-labels. Contributed by Wangda Tan.
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
based on node-labels. Contributed b
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b1f239c..5beed37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,7 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -31,10 +39,14 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.ImmutableSet;
+
public class CapacitySchedulerConfiguration extends Configuration {
private static final Log LOG =
@@ -83,6 +95,12 @@ public class CapacitySchedulerConfiguration extends Configuration {
public static final String STATE = "state";
@Private
+ public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels";
+
+ @Private
+ public static final String DEFAULT_NODE_LABEL_EXPRESSION =
+ "default-node-label-expression";
+
public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
+ "reservations-continue-look-all-nodes";
@@ -268,6 +286,10 @@ public class CapacitySchedulerConfiguration extends Configuration {
return queueName;
}
+ private String getNodeLabelPrefix(String queue, String label) {
+ return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
+ }
+
public int getMaximumSystemApplications() {
int maxApplications =
getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
@@ -343,6 +365,15 @@ public class CapacitySchedulerConfiguration extends Configuration {
", maxCapacity=" + maxCapacity);
}
+ public void setCapacityByLabel(String queue, String label, float capacity) {
+ setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
+ }
+
+ public void setMaximumCapacityByLabel(String queue, String label,
+ float capacity) {
+ setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
+ }
+
public int getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
DEFAULT_USER_LIMIT);
@@ -372,6 +403,121 @@ public class CapacitySchedulerConfiguration extends Configuration {
QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
}
+ public void setAccessibleNodeLabels(String queue, Set<String> labels) {
+ if (labels == null) {
+ return;
+ }
+ String str = StringUtils.join(",", labels);
+ set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str);
+ }
+
+ public Set<String> getAccessibleNodeLabels(String queue) {
+ String accessibleLabelStr =
+ get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS);
+
+ // When accessible-label is null,
+ if (accessibleLabelStr == null) {
+ // Only return null when queue is not ROOT
+ if (!queue.equals(ROOT)) {
+ return null;
+ }
+ } else {
+ // print a warning when accessibleNodeLabel specified in config and queue
+ // is ROOT
+ if (queue.equals(ROOT)) {
+ LOG.warn("Accessible node labels for root queue will be ignored,"
+ + " it will be automatically set to \"*\".");
+ }
+ }
+
+ // always return ANY for queue root
+ if (queue.equals(ROOT)) {
+ return ImmutableSet.of(RMNodeLabelsManager.ANY);
+ }
+
+ // In other cases, split the accessibleLabelStr by ","
+ Set<String> set = new HashSet<String>();
+ for (String str : accessibleLabelStr.split(",")) {
+ if (!str.trim().isEmpty()) {
+ set.add(str.trim());
+ }
+ }
+
+ // if labels contains "*", only keep ANY behind
+ if (set.contains(RMNodeLabelsManager.ANY)) {
+ set.clear();
+ set.add(RMNodeLabelsManager.ANY);
+ }
+ return Collections.unmodifiableSet(set);
+ }
+
+ public Map<String, Float> getNodeLabelCapacities(String queue,
+ Set<String> labels, RMNodeLabelsManager mgr) {
+ Map<String, Float> nodeLabelCapacities = new HashMap<String, Float>();
+
+ if (labels == null) {
+ return nodeLabelCapacities;
+ }
+
+ for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
+ .getClusterNodeLabels() : labels) {
+ // capacity of all labels in each queue should be 1
+ if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
+ nodeLabelCapacities.put(label, 1.0f);
+ continue;
+ }
+ float capacity =
+ getFloat(getNodeLabelPrefix(queue, label) + CAPACITY, UNDEFINED);
+ if (capacity < MINIMUM_CAPACITY_VALUE
+ || capacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal " + "capacity of "
+ + capacity + " for label=" + label + " in queue=" + queue);
+ }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
+ }
+
+ nodeLabelCapacities.put(label, capacity / 100f);
+ }
+ return nodeLabelCapacities;
+ }
+
+ public Map<String, Float> getMaximumNodeLabelCapacities(String queue,
+ Set<String> labels, RMNodeLabelsManager mgr) {
+ Map<String, Float> maximumNodeLabelCapacities = new HashMap<String, Float>();
+ if (labels == null) {
+ return maximumNodeLabelCapacities;
+ }
+
+ for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
+ .getClusterNodeLabels() : labels) {
+ float maxCapacity =
+ getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
+ UNDEFINED);
+ maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
+ MAXIMUM_CAPACITY_VALUE : maxCapacity;
+ if (maxCapacity < MINIMUM_CAPACITY_VALUE
+ || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
+ throw new IllegalArgumentException("Illegal " + "capacity of "
+ + maxCapacity + " for label=" + label + " in queue=" + queue);
+ }
+ LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity);
+
+ maximumNodeLabelCapacities.put(label, maxCapacity / 100f);
+ }
+ return maximumNodeLabelCapacities;
+ }
+
+ public String getDefaultNodeLabelExpression(String queue) {
+ return get(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION);
+ }
+
+ public void setDefaultNodeLabelExpression(String queue, String exp) {
+ set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
+ }
+
/*
* Returns whether we should continue to look at all heart beating nodes even
* after the reservation limit was hit. The node heart beating in could
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index cab0318..ffeec63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,12 +24,14 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,36 +54,31 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
@Private
@Unstable
-public class LeafQueue implements CSQueue {
+public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
- private final String queueName;
- private CSQueue parent;
- private float capacity;
- private float absoluteCapacity;
- private float maximumCapacity;
- private float absoluteMaxCapacity;
private float absoluteUsedCapacity = 0.0f;
private int userLimit;
private float userLimitFactor;
@@ -95,10 +92,6 @@ public class LeafQueue implements CSQueue {
private int maxActiveApplicationsPerUser;
private int nodeLocalityDelay;
-
- private Resource usedResources = Resources.createResource(0, 0);
- private float usedCapacity = 0.0f;
- private volatile int numContainers;
Set<FiCaSchedulerApp> activeApplications;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
@@ -106,20 +99,9 @@ public class LeafQueue implements CSQueue {
Set<FiCaSchedulerApp> pendingApplications;
- private final Resource minimumAllocation;
- private final Resource maximumAllocation;
private final float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>();
-
- private final QueueMetrics metrics;
-
- private QueueInfo queueInfo;
-
- private QueueState state;
-
- private Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -127,29 +109,18 @@ public class LeafQueue implements CSQueue {
private CapacitySchedulerContext scheduler;
private final ActiveUsersManager activeUsersManager;
-
- private final ResourceCalculator resourceCalculator;
-
- private boolean reservationsContinueLooking;
+
+ // cache last cluster resource to compute actual capacity
+ private Resource lastClusterResource = Resources.none();
private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
public LeafQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
this.scheduler = cs;
- this.queueName = queueName;
- this.parent = parent;
-
- this.resourceCalculator = cs.getResourceCalculator();
- // must be after parent and queueName are initialized
- this.metrics = old != null ? old.getMetrics() :
- QueueMetrics.forQueue(getQueuePath(), parent,
- cs.getConfiguration().getEnableUserMetrics(),
- cs.getConf());
this.activeUsersManager = new ActiveUsersManager(metrics);
- this.minimumAllocation = cs.getMinimumResourceCapability();
- this.maximumAllocation = cs.getMaximumResourceCapability();
this.minimumAllocationFactor =
Resources.ratio(resourceCalculator,
Resources.subtract(maximumAllocation, minimumAllocation),
@@ -167,7 +138,8 @@ public class LeafQueue implements CSQueue {
float userLimitFactor =
cs.getConfiguration().getUserLimitFactor(getQueuePath());
- int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
+ int maxApplications =
+ cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
maxApplications = (int)(maxSystemApps * absoluteCapacity);
@@ -187,12 +159,10 @@ public class LeafQueue implements CSQueue {
resourceCalculator,
cs.getClusterResource(), this.minimumAllocation,
maxAMResourcePerQueuePercent, absoluteCapacity);
- int maxActiveApplicationsPerUser =
- CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit,
- userLimitFactor);
+ int maxActiveApplicationsPerUser =
+ CSQueueUtils.computeMaxActiveApplicationsPerUser(
+ maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
- this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
- this.queueInfo.setQueueName(queueName);
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
QueueState state = cs.getConfiguration().getState(getQueuePath());
@@ -200,14 +170,13 @@ public class LeafQueue implements CSQueue {
Map<QueueACL, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
- setupQueueConfigs(
- cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity,
- userLimit, userLimitFactor,
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
- maxActiveApplications, maxActiveApplicationsPerUser, state, acls,
- cs.getConfiguration().getNodeLocalityDelay(),
+ maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
+ .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+ defaultLabelExpression, this.capacitiyByNodeLabels,
+ this.maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
if(LOG.isDebugEnabled()) {
@@ -221,7 +190,7 @@ public class LeafQueue implements CSQueue {
new TreeSet<FiCaSchedulerApp>(applicationComparator);
this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
}
-
+
// externalizing in method, to allow overriding
protected float getCapacityFromConf() {
return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
@@ -236,19 +205,22 @@ public class LeafQueue implements CSQueue {
int maxApplicationsPerUser, int maxActiveApplications,
int maxActiveApplicationsPerUser, QueueState state,
Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
- boolean continueLooking)
- {
+ Set<String> labels, String defaultLabelExpression,
+ Map<String, Float> capacitieByLabel,
+ Map<String, Float> maximumCapacitiesByLabel,
+ boolean revervationContinueLooking) throws IOException {
+ super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+ defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
+ revervationContinueLooking);
// Sanity check
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
float absCapacity = getParent().getAbsoluteCapacity() * capacity;
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
+ absoluteMaxCapacity);
- this.capacity = capacity;
this.absoluteCapacity = absCapacity;
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absoluteMaxCapacity;
-
this.userLimit = userLimit;
this.userLimitFactor = userLimitFactor;
@@ -258,27 +230,35 @@ public class LeafQueue implements CSQueue {
this.maxActiveApplications = maxActiveApplications;
this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-
- this.state = state;
-
- this.acls = acls;
- this.queueInfo.setCapacity(this.capacity);
- this.queueInfo.setMaximumCapacity(this.maximumCapacity);
- this.queueInfo.setQueueState(this.state);
+ if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
+ this.defaultLabelExpression)) {
+ throw new IOException("Invalid default label expression of "
+ + " queue="
+ + queueInfo.getQueueName()
+ + " doesn't have permission to access all labels "
+ + "in default label expression. labelExpression of resource request="
+ + (this.defaultLabelExpression == null ? ""
+ : this.defaultLabelExpression)
+ + ". Queue labels="
+ + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo
+ .getAccessibleNodeLabels().iterator(), ',')));
+ }
this.nodeLocalityDelay = nodeLocalityDelay;
- this.reservationsContinueLooking = continueLooking;
StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
-
- // Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource,
- minimumAllocation);
+
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (labels != null) {
+ for (String s : labels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info("Initializing " + queueName + "\n" +
"capacity = " + capacity +
@@ -333,50 +313,12 @@ public class LeafQueue implements CSQueue {
" [= configuredState ]" + "\n" +
"acls = " + aclsString +
" [= configuredAcls ]" + "\n" +
+ "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
+ "labels=" + labelStrBuilder.toString() + "\n" +
"nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
"reservationsContinueLooking = " +
reservationsContinueLooking + "\n");
}
-
- @Override
- public synchronized float getCapacity() {
- return capacity;
- }
-
- @Override
- public synchronized float getAbsoluteCapacity() {
- return absoluteCapacity;
- }
-
- @Override
- public synchronized float getMaximumCapacity() {
- return maximumCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteMaximumCapacity() {
- return absoluteMaxCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteUsedCapacity() {
- return absoluteUsedCapacity;
- }
-
- @Override
- public synchronized CSQueue getParent() {
- return parent;
- }
-
- @Override
- public synchronized void setParent(CSQueue newParentQueue) {
- this.parent = (ParentQueue)newParentQueue;
- }
-
- @Override
- public String getQueueName() {
- return queueName;
- }
@Override
public String getQueuePath() {
@@ -387,22 +329,6 @@ public class LeafQueue implements CSQueue {
* Used only by tests.
*/
@Private
- public Resource getMinimumAllocation() {
- return minimumAllocation;
- }
-
- /**
- * Used only by tests.
- */
- @Private
- public Resource getMaximumAllocation() {
- return maximumAllocation;
- }
-
- /**
- * Used only by tests.
- */
- @Private
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
@@ -437,45 +363,9 @@ public class LeafQueue implements CSQueue {
}
@Override
- public synchronized float getUsedCapacity() {
- return usedCapacity;
- }
-
- @Override
- public synchronized Resource getUsedResources() {
- return usedResources;
- }
-
- @Override
public List<CSQueue> getChildQueues() {
return null;
}
-
- @Override
- public synchronized void setUsedCapacity(float usedCapacity) {
- this.usedCapacity = usedCapacity;
- }
-
- @Override
- public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
- this.absoluteUsedCapacity = absUsedCapacity;
- }
-
- /**
- * Set maximum capacity - used only for testing.
- * @param maximumCapacity new max capacity
- */
- synchronized void setMaxCapacity(float maximumCapacity) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- float absMaxCapacity =
- CSQueueUtils.computeAbsoluteMaximumCapacity(
- maximumCapacity, getParent());
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absMaxCapacity;
- }
/**
* Set user limit - used only for testing.
@@ -569,11 +459,6 @@ public class LeafQueue implements CSQueue {
return nodeLocalityDelay;
}
- @Private
- boolean getReservationContinueLooking() {
- return reservationsContinueLooking;
- }
-
public String toString() {
return queueName + ": " +
"capacity=" + capacity + ", " +
@@ -584,6 +469,11 @@ public class LeafQueue implements CSQueue {
"numApps=" + getNumApplications() + ", " +
"numContainers=" + getNumContainers();
}
+
+ @VisibleForTesting
+ public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) {
+ this.labelManager = mgr;
+ }
@VisibleForTesting
public synchronized User getUser(String userName) {
@@ -633,6 +523,10 @@ public class LeafQueue implements CSQueue {
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
newlyParsedLeafQueue.getNodeLocalityDelay(),
+ newlyParsedLeafQueue.accessibleLabels,
+ newlyParsedLeafQueue.defaultLabelExpression,
+ newlyParsedLeafQueue.capacitiyByNodeLabels,
+ newlyParsedLeafQueue.maxCapacityByNodeLabels,
newlyParsedLeafQueue.reservationsContinueLooking);
// queue metrics are updated, more resource may be available
@@ -641,19 +535,6 @@ public class LeafQueue implements CSQueue {
}
@Override
- public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
- // Check if the leaf-queue allows access
- synchronized (this) {
- if (acls.get(acl).isUserAllowed(user)) {
- return true;
- }
- }
-
- // Check if parent-queue allows access
- return getParent().hasAccess(acl, user);
- }
-
- @Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
@@ -749,7 +630,8 @@ public class LeafQueue implements CSQueue {
}
}
- private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
+ private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
+ User user) {
// Accept
user.submitApplication();
pendingApplications.add(application);
@@ -785,7 +667,8 @@ public class LeafQueue implements CSQueue {
getParent().finishApplicationAttempt(application, queue);
}
- public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
+ public synchronized void removeApplicationAttempt(
+ FiCaSchedulerApp application, User user) {
boolean wasActive = activeApplications.remove(application);
if (!wasActive) {
pendingApplications.remove(application);
@@ -821,6 +704,21 @@ public class LeafQueue implements CSQueue {
private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
+ private static Set<String> getRequestLabelSetByExpression(
+ String labelExpression) {
+ Set<String> labels = new HashSet<String>();
+ if (null == labelExpression) {
+ return labels;
+ }
+ for (String l : labelExpression.split("&&")) {
+ if (l.trim().isEmpty()) {
+ continue;
+ }
+ labels.add(l.trim());
+ }
+ return labels;
+ }
+
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, boolean needToUnreserve) {
@@ -830,6 +728,12 @@ public class LeafQueue implements CSQueue {
+ " #applications=" + activeApplications.size());
}
+ // if our queue cannot access this node, just return
+ if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
+ labelManager.getLabelsOnNode(node.getNodeID()))) {
+ return NULL_ASSIGNMENT;
+ }
+
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
@@ -879,6 +783,10 @@ public class LeafQueue implements CSQueue {
continue;
}
}
+
+ Set<String> requestedNodeLabels =
+ getRequestLabelSetByExpression(anyRequest
+ .getNodeLabelExpression());
// Compute user-limit & set headroom
// Note: We compute both user-limit & headroom with the highest
@@ -887,16 +795,17 @@ public class LeafQueue implements CSQueue {
// before all higher priority ones are serviced.
Resource userLimit =
computeUserLimitAndSetHeadroom(application, clusterResource,
- required);
+ required, requestedNodeLabels);
// Check queue max-capacity limit
- if (!assignToQueue(clusterResource, required, application, true)) {
+ if (!canAssignToThisQueue(clusterResource, required,
+ labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
return NULL_ASSIGNMENT;
}
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
- application, true)) {
+ application, true, requestedNodeLabels)) {
break;
}
@@ -922,7 +831,8 @@ public class LeafQueue implements CSQueue {
// Book-keeping
// Note: Update headroom to account for current allocation too...
- allocateResource(clusterResource, application, assigned);
+ allocateResource(clusterResource, application, assigned,
+ labelManager.getLabelsOnNode(node.getNodeID()));
// Don't reset scheduling opportunities for non-local assignments
// otherwise the app will be delayed for each non-local assignment.
@@ -976,7 +886,7 @@ public class LeafQueue implements CSQueue {
protected Resource getHeadroom(User user, Resource queueMaxCap,
Resource clusterResource, FiCaSchedulerApp application, Resource required) {
return getHeadroom(user, queueMaxCap, clusterResource,
- computeUserLimit(application, clusterResource, required, user));
+ computeUserLimit(application, clusterResource, required, user, null));
}
private Resource getHeadroom(User user, Resource queueMaxCap,
@@ -1000,33 +910,49 @@ public class LeafQueue implements CSQueue {
*/
Resource headroom =
Resources.min(resourceCalculator, clusterResource,
- Resources.subtract(userLimit, user.getConsumedResources()),
+ Resources.subtract(userLimit, user.getTotalConsumedResources()),
Resources.subtract(queueMaxCap, usedResources)
);
return headroom;
}
-
- @Private
- protected synchronized boolean assignToQueue(Resource clusterResource,
- Resource required, FiCaSchedulerApp application,
+ synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ Resource required, Set<String> nodeLabels, FiCaSchedulerApp application,
boolean checkReservations) {
-
- Resource potentialTotalResource = Resources.add(usedResources, required);
- // Check how of the cluster's absolute capacity we are currently using...
- float potentialNewCapacity = Resources.divide(resourceCalculator,
- clusterResource, potentialTotalResource, clusterResource);
- if (potentialNewCapacity > absoluteMaxCapacity) {
+ // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+ Set<String> labelCanAccess;
+ if (null == nodeLabels || nodeLabels.isEmpty()) {
+ labelCanAccess = new HashSet<String>();
+ // Any queue can always access any node without label
+ labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+ } else {
+ labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
+ }
+
+ boolean canAssign = true;
+ for (String label : labelCanAccess) {
+ if (!usedResourcesByNodeLabels.containsKey(label)) {
+ usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+ }
+
+ Resource potentialTotalCapacity =
+ Resources.add(usedResourcesByNodeLabels.get(label), required);
+
+ float potentialNewCapacity =
+ Resources.divide(resourceCalculator, clusterResource,
+ potentialTotalCapacity,
+ labelManager.getResourceByLabel(label, clusterResource));
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking && checkReservations) {
-
+ // TODO, now only consider reservation cases when the node has no label
+ if (this.reservationsContinueLooking && checkReservations
+ && label.equals(RMNodeLabelsManager.NO_LABEL)) {
float potentialNewWithoutReservedCapacity = Resources.divide(
resourceCalculator,
clusterResource,
- Resources.subtract(potentialTotalResource,
- application.getCurrentReservation()),
- clusterResource);
+ Resources.subtract(potentialTotalCapacity,
+ application.getCurrentReservation()),
+ labelManager.getResourceByLabel(label, clusterResource));
if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
if (LOG.isDebugEnabled()) {
@@ -1048,35 +974,43 @@ public class LeafQueue implements CSQueue {
// we could potentially use this node instead of reserved node
return true;
}
-
}
+
+ // Otherwise, if any of the label of this node beyond queue limit, we
+ // cannot allocate on this node. Consider a small epsilon here.
+ if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) {
+ canAssign = false;
+ break;
+ }
+
if (LOG.isDebugEnabled()) {
LOG.debug(getQueueName()
- + " usedResources: " + usedResources
+ + "Check assign to queue, label=" + label
+ + " usedResources: " + usedResourcesByNodeLabels.get(label)
+ " clusterResources: " + clusterResource
+ " currentCapacity "
+ Resources.divide(resourceCalculator, clusterResource,
- usedResources, clusterResource) + " required " + required
+ usedResourcesByNodeLabels.get(label),
+ labelManager.getResourceByLabel(label, clusterResource))
+ " potentialNewCapacity: " + potentialNewCapacity + " ( "
+ " max-capacity: " + absoluteMaxCapacity + ")");
}
- return false;
}
- return true;
+
+ return canAssign;
}
-
-
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
- Resource computeUserLimitAndSetHeadroom(
- FiCaSchedulerApp application, Resource clusterResource, Resource required) {
-
+ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
+ Resource clusterResource, Resource required, Set<String> requestedLabels) {
String user = application.getUser();
-
User queueUser = getUser(user);
- Resource userLimit = // User limit
- computeUserLimit(application, clusterResource, required, queueUser);
+ // Compute user limit respect requested labels,
+ // TODO, need consider headroom respect labels also
+ Resource userLimit =
+ computeUserLimit(application, clusterResource, required,
+ queueUser, requestedLabels);
//Max avail capacity needs to take into account usage by ancestor-siblings
//which are greater than their base capacity, so we are interested in "max avail"
@@ -1096,13 +1030,14 @@ public class LeafQueue implements CSQueue {
queueHeadroomInfo.setClusterResource(clusterResource);
}
- Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
+ Resource headroom =
+ getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
if (LOG.isDebugEnabled()) {
LOG.debug("Headroom calculation for user " + user + ": " +
" userLimit=" + userLimit +
" queueMaxCap=" + queueMaxCap +
- " consumed=" + queueUser.getConsumedResources() +
+ " consumed=" + queueUser.getTotalConsumedResources() +
" headroom=" + headroom);
}
@@ -1117,24 +1052,42 @@ public class LeafQueue implements CSQueue {
}
@Lock(NoLock.class)
- private Resource computeUserLimit(FiCaSchedulerApp application,
- Resource clusterResource, Resource required, User user) {
+ private Resource computeUserLimit(FiCaSchedulerApp application,
+ Resource clusterResource, Resource required, User user,
+ Set<String> requestedLabels) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
// we're running below capacity. The 'max' ensures that jobs in queues
// with miniscule capacity (< 1 slot) make progress
// * If we're running over capacity, then its
// (usedResources + required) (which extra resources we are allocating)
+ Resource queueCapacity = Resource.newInstance(0, 0);
+ if (requestedLabels != null && !requestedLabels.isEmpty()) {
+ // if we have multiple labels to request, we will choose to use the first
+ // label
+ String firstLabel = requestedLabels.iterator().next();
+ queueCapacity =
+ Resources
+ .max(resourceCalculator, clusterResource, queueCapacity,
+ Resources.multiplyAndNormalizeUp(resourceCalculator,
+ labelManager.getResourceByLabel(firstLabel,
+ clusterResource),
+ getAbsoluteCapacityByNodeLabel(firstLabel),
+ minimumAllocation));
+ } else {
+ // else there's no label on request, just to use absolute capacity as
+ // capacity for nodes without label
+ queueCapacity =
+ Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
+ .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
+ absoluteCapacity, minimumAllocation);
+ }
// Allow progress for queues with miniscule capacity
- final Resource queueCapacity =
+ queueCapacity =
Resources.max(
resourceCalculator, clusterResource,
- Resources.multiplyAndNormalizeUp(
- resourceCalculator,
- clusterResource,
- absoluteCapacity,
- minimumAllocation),
+ queueCapacity,
required);
Resource currentCapacity =
@@ -1175,7 +1128,7 @@ public class LeafQueue implements CSQueue {
" userLimit=" + userLimit +
" userLimitFactor=" + userLimitFactor +
" required: " + required +
- " consumed: " + user.getConsumedResources() +
+ " consumed: " + user.getTotalConsumedResources() +
" limit: " + limit +
" queueCapacity: " + queueCapacity +
" qconsumed: " + usedResources +
@@ -1191,28 +1144,33 @@ public class LeafQueue implements CSQueue {
@Private
protected synchronized boolean assignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
- boolean checkReservations) {
-
+ boolean checkReservations, Set<String> requestLabels) {
User user = getUser(userName);
+
+ String label = CommonNodeLabelsManager.NO_LABEL;
+ if (requestLabels != null && !requestLabels.isEmpty()) {
+ label = requestLabels.iterator().next();
+ }
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- user.getConsumedResources(), limit)) {
-
+ if (Resources
+ .greaterThan(resourceCalculator, clusterResource,
+ user.getConsumedResourceByLabel(label),
+ limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking && checkReservations) {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
- Resources.subtract(user.getConsumedResources(),
+ Resources.subtract(user.getTotalConsumedResources(),
application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit based on reservations - " + " consumed: "
- + user.getConsumedResources() + " reserved: "
+ + user.getTotalConsumedResources() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit);
}
return true;
@@ -1221,14 +1179,15 @@ public class LeafQueue implements CSQueue {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
+ " will exceed limit - " + " consumed: "
- + user.getConsumedResources() + " limit: " + limit);
+ + user.getTotalConsumedResources() + " limit: " + limit);
}
return false;
}
return true;
}
- boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
+ boolean needContainers(FiCaSchedulerApp application, Priority priority,
+ Resource required) {
int requiredContainers = application.getTotalRequiredResources(priority);
int reservedContainers = application.getNumReservedContainers(priority);
int starvation = 0;
@@ -1258,10 +1217,9 @@ public class LeafQueue implements CSQueue {
return (((starvation + requiredContainers) - reservedContainers) > 0);
}
- private CSAssignment assignContainersOnNode(Resource clusterResource,
- FiCaSchedulerNode node, FiCaSchedulerApp application,
- Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
-
+ private CSAssignment assignContainersOnNode(Resource clusterResource,
+ FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+ RMContainer reservedContainer, boolean needToUnreserve) {
Resource assigned = Resources.none();
// Data-local
@@ -1366,10 +1324,11 @@ public class LeafQueue implements CSQueue {
// we can't reserve if we got here based on the limit
// checks assuming we could unreserve!!!
Resource userLimit = computeUserLimitAndSetHeadroom(application,
- clusterResource, capability);
+ clusterResource, capability, null);
- // Check queue max-capacity limit
- if (!assignToQueue(clusterResource, capability, application, false)) {
+ // Check queue max-capacity limit,
+ // TODO: Consider reservation on labels
+ if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit queue limit");
}
@@ -1378,7 +1337,7 @@ public class LeafQueue implements CSQueue {
// Check user limit
if (!assignToUser(clusterResource, application.getUser(), userLimit,
- application, false)) {
+ application, false, null)) {
if (LOG.isDebugEnabled()) {
LOG.debug("was going to reserve but hit user limit");
}
@@ -1516,6 +1475,20 @@ public class LeafQueue implements CSQueue {
+ " request=" + request + " type=" + type
+ " needToUnreserve= " + needToUnreserve);
}
+
+ // check if the resource request can access the label
+ if (!SchedulerUtils.checkNodeLabelExpression(
+ labelManager.getLabelsOnNode(node.getNodeID()),
+ request.getNodeLabelExpression())) {
+ // this is a reserved container, but we cannot allocate it now according
+ // to label not match. This can be caused by node label changed
+ // We should un-reserve this container.
+ if (rmContainer != null) {
+ unreserve(application, priority, node, rmContainer);
+ }
+ return Resources.none();
+ }
+
Resource capability = request.getCapability();
Resource available = node.getAvailableResource();
Resource totalResource = node.getTotalResource();
@@ -1695,8 +1668,9 @@ public class LeafQueue implements CSQueue {
// Book-keeping
if (removed) {
- releaseResource(clusterResource,
- application, container.getResource());
+ releaseResource(clusterResource, application,
+ container.getResource(),
+ labelManager.getLabelsOnNode(node.getNodeID()));
LOG.info("completedContainer" +
" container=" + container +
" queue=" + this +
@@ -1712,18 +1686,18 @@ public class LeafQueue implements CSQueue {
}
}
- synchronized void allocateResource(Resource clusterResource,
- SchedulerApplicationAttempt application, Resource resource) {
- // Update queue metrics
- Resources.addTo(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource, minimumAllocation);
- ++numContainers;
-
+ synchronized void allocateResource(Resource clusterResource,
+ SchedulerApplicationAttempt application, Resource resource,
+ Set<String> nodeLabels) {
+ super.allocateResource(clusterResource, resource, nodeLabels);
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
- user.assignContainer(resource);
+ user.assignContainer(resource, nodeLabels);
+ // Note this is a bit unconventional since it gets the object and modifies
+ // it here, rather then using set routine
+ Resources.subtractFrom(application.getHeadroom(), resource); // headroom
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
@@ -1731,33 +1705,30 @@ public class LeafQueue implements CSQueue {
" user=" + userName +
" used=" + usedResources + " numContainers=" + numContainers +
" headroom = " + application.getHeadroom() +
- " user-resources=" + user.getConsumedResources()
+ " user-resources=" + user.getTotalConsumedResources()
);
}
}
synchronized void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource) {
- // Update queue metrics
- Resources.subtractFrom(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, getParent(), clusterResource,
- minimumAllocation);
- --numContainers;
-
+ FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) {
+ super.releaseResource(clusterResource, resource, nodeLabels);
+
// Update user metrics
String userName = application.getUser();
User user = getUser(userName);
- user.releaseContainer(resource);
+ user.releaseContainer(resource, nodeLabels);
metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
LOG.info(getQueueName() +
" used=" + usedResources + " numContainers=" + numContainers +
- " user=" + userName + " user-resources=" + user.getConsumedResources());
+ " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
}
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
+ lastClusterResource = clusterResource;
+
// Update queue properties
maxActiveApplications =
CSQueueUtils.computeMaxActiveApplications(
@@ -1786,25 +1757,29 @@ public class LeafQueue implements CSQueue {
for (FiCaSchedulerApp application : activeApplications) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
- Resources.none());
+ Resources.none(), null);
}
}
}
-
- @Override
- public QueueMetrics getMetrics() {
- return metrics;
- }
@VisibleForTesting
public static class User {
Resource consumed = Resources.createResource(0, 0);
+ Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
int pendingApplications = 0;
int activeApplications = 0;
- public Resource getConsumedResources() {
+ public Resource getTotalConsumedResources() {
return consumed;
}
+
+ public Resource getConsumedResourceByLabel(String label) {
+ Resource r = consumedByLabel.get(label);
+ if (null != r) {
+ return r;
+ }
+ return Resources.none();
+ }
public int getPendingApplications() {
return pendingApplications;
@@ -1836,12 +1811,46 @@ public class LeafQueue implements CSQueue {
}
}
- public synchronized void assignContainer(Resource resource) {
+ public synchronized void assignContainer(Resource resource,
+ Set<String> nodeLabels) {
Resources.addTo(consumed, resource);
+
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+ consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL),
+ resource);
+ } else {
+ for (String label : nodeLabels) {
+ if (!consumedByLabel.containsKey(label)) {
+ consumedByLabel.put(label, Resources.createResource(0));
+ }
+ Resources.addTo(consumedByLabel.get(label), resource);
+ }
+ }
}
- public synchronized void releaseContainer(Resource resource) {
+ public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels) {
Resources.subtractFrom(consumed, resource);
+
+ // Update usedResources by labels
+ if (nodeLabels == null || nodeLabels.isEmpty()) {
+ if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+ consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
+ Resources.createResource(0));
+ }
+ Resources.subtractFrom(
+ consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource);
+ } else {
+ for (String label : nodeLabels) {
+ if (!consumedByLabel.containsKey(label)) {
+ consumedByLabel.put(label, Resources.createResource(0));
+ }
+ Resources.subtractFrom(consumedByLabel.get(label), resource);
+ }
+ }
}
}
@@ -1854,7 +1863,8 @@ public class LeafQueue implements CSQueue {
// Careful! Locking order is important!
synchronized (this) {
allocateResource(clusterResource, attempt, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
}
getParent().recoverContainer(clusterResource, attempt, rmContainer);
}
@@ -1892,7 +1902,8 @@ public class LeafQueue implements CSQueue {
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1908,7 +1919,8 @@ public class LeafQueue implements CSQueue {
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource());
+ .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
+ .getNodeId()));
LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
+ " resource=" + rmContainer.getContainer().getResource()
+ " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1919,6 +1931,24 @@ public class LeafQueue implements CSQueue {
}
}
+ @Override
+ public float getAbsActualCapacity() {
+ if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+ lastClusterResource, Resources.none())) {
+ return absoluteCapacity;
+ }
+
+ Resource resourceRespectLabels =
+ labelManager == null ? lastClusterResource : labelManager
+ .getQueueResource(queueName, accessibleLabels, lastClusterResource);
+ float absActualCapacity =
+ Resources.divide(resourceCalculator, lastClusterResource,
+ resourceRespectLabels, lastClusterResource);
+
+ return absActualCapacity > absoluteCapacity ? absoluteCapacity
+ : absActualCapacity;
+ }
+
public void setCapacity(float capacity) {
this.capacity = capacity;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 011c99c..6ffaf4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,12 +23,14 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Comparator;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
+import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,77 +48,42 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
+import com.google.common.collect.Sets;
+
@Private
@Evolving
-public class ParentQueue implements CSQueue {
+public class ParentQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(ParentQueue.class);
- private CSQueue parent;
- private final String queueName;
-
- private float capacity;
- private float maximumCapacity;
- private float absoluteCapacity;
- private float absoluteMaxCapacity;
- private float absoluteUsedCapacity = 0.0f;
-
- private float usedCapacity = 0.0f;
-
- protected final Set<CSQueue> childQueues;
- private final Comparator<CSQueue> queueComparator;
-
- private Resource usedResources = Resources.createResource(0, 0);
-
+ protected final Set<CSQueue> childQueues;
private final boolean rootQueue;
-
- private final Resource minimumAllocation;
-
- private volatile int numApplications;
- private volatile int numContainers;
-
- private QueueState state;
-
- private final QueueMetrics metrics;
-
- private QueueInfo queueInfo;
-
- private Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
+ final Comparator<CSQueue> queueComparator;
+ volatile int numApplications;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- private final ResourceCalculator resourceCalculator;
-
- private boolean reservationsContinueLooking;
-
public ParentQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) {
- minimumAllocation = cs.getMinimumResourceCapability();
+ String queueName, CSQueue parent, CSQueue old) throws IOException {
+ super(cs, queueName, parent, old);
- this.parent = parent;
- this.queueName = queueName;
- this.rootQueue = (parent == null);
- this.resourceCalculator = cs.getResourceCalculator();
+ this.queueComparator = cs.getQueueComparator();
- // must be called after parent and queueName is set
- this.metrics = old != null ? old.getMetrics() :
- QueueMetrics.forQueue(getQueuePath(), parent,
- cs.getConfiguration().getEnableUserMetrics(),
- cs.getConf());
+ this.rootQueue = (parent == null);
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
@@ -141,17 +108,14 @@ public class ParentQueue implements CSQueue {
Map<QueueACL, AccessControlList> acls =
cs.getConfiguration().getAcls(getQueuePath());
-
- this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
- this.queueInfo.setQueueName(queueName);
+
this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
- setupQueueConfigs(cs.getClusterResource(),
- capacity, absoluteCapacity,
- maximumCapacity, absoluteMaxCapacity, state, acls,
+ setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
+ defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels,
cs.getConfiguration().getReservationContinueLook());
- this.queueComparator = cs.getQueueComparator();
this.childQueues = new TreeSet<CSQueue>(queueComparator);
LOG.info("Initialized parent-queue " + queueName +
@@ -159,41 +123,29 @@ public class ParentQueue implements CSQueue {
", fullname=" + getQueuePath());
}
- protected synchronized void setupQueueConfigs(
- Resource clusterResource,
- float capacity, float absoluteCapacity,
- float maximumCapacity, float absoluteMaxCapacity,
+ synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+ float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
QueueState state, Map<QueueACL, AccessControlList> acls,
- boolean continueLooking
- ) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity);
-
- this.capacity = capacity;
- this.absoluteCapacity = absoluteCapacity;
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absoluteMaxCapacity;
-
- this.state = state;
-
- this.acls = acls;
-
- this.queueInfo.setCapacity(this.capacity);
- this.queueInfo.setMaximumCapacity(this.maximumCapacity);
- this.queueInfo.setQueueState(this.state);
-
- this.reservationsContinueLooking = continueLooking;
-
- StringBuilder aclsString = new StringBuilder();
+ Set<String> accessibleLabels, String defaultLabelExpression,
+ Map<String, Float> nodeLabelCapacities,
+ Map<String, Float> maximumCapacitiesByLabel,
+ boolean reservationContinueLooking) throws IOException {
+ super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+ maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
+ defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
+ reservationContinueLooking);
+ StringBuilder aclsString = new StringBuilder();
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
}
- // Update metrics
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
+ StringBuilder labelStrBuilder = new StringBuilder();
+ if (accessibleLabels != null) {
+ for (String s : accessibleLabels) {
+ labelStrBuilder.append(s);
+ labelStrBuilder.append(",");
+ }
+ }
LOG.info(queueName +
", capacity=" + capacity +
@@ -201,13 +153,13 @@ public class ParentQueue implements CSQueue {
", maxCapacity=" + maximumCapacity +
", asboluteMaxCapacity=" + absoluteMaxCapacity +
", state=" + state +
- ", acls=" + aclsString +
+ ", acls=" + aclsString +
+ ", labels=" + labelStrBuilder.toString() + "\n" +
", reservationsContinueLooking=" + reservationsContinueLooking);
}
private static float PRECISION = 0.0005f; // 0.05% precision
void setChildQueues(Collection<CSQueue> childQueues) {
-
// Validate
float childCapacities = 0;
for (CSQueue queue : childQueues) {
@@ -221,6 +173,21 @@ public class ParentQueue implements CSQueue {
" capacity of " + childCapacities +
" for children of queue " + queueName);
}
+ // check label capacities
+ for (String nodeLabel : labelManager.getClusterNodeLabels()) {
+ float capacityByLabel = getCapacityByNodeLabel(nodeLabel);
+ // check children's labels
+ float sum = 0;
+ for (CSQueue queue : childQueues) {
+ sum += queue.getCapacityByNodeLabel(nodeLabel);
+ }
+ if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
+ || (capacityByLabel == 0) && (sum > 0)) {
+ throw new IllegalArgumentException("Illegal" + " capacity of "
+ + sum + " for children of queue " + queueName
+ + " for label=" + nodeLabel);
+ }
+ }
this.childQueues.clear();
this.childQueues.addAll(childQueues);
@@ -228,21 +195,6 @@ public class ParentQueue implements CSQueue {
LOG.debug("setChildQueues: " + getChildQueuesToPrint());
}
}
-
- @Override
- public synchronized CSQueue getParent() {
- return parent;
- }
-
- @Override
- public synchronized void setParent(CSQueue newParentQueue) {
- this.parent = (ParentQueue)newParentQueue;
- }
-
- @Override
- public String getQueueName() {
- return queueName;
- }
@Override
public String getQueuePath() {
@@ -251,65 +203,6 @@ public class ParentQueue implements CSQueue {
}
@Override
- public synchronized float getCapacity() {
- return capacity;
- }
-
- @Override
- public synchronized float getAbsoluteCapacity() {
- return absoluteCapacity;
- }
-
- @Override
- public float getAbsoluteMaximumCapacity() {
- return absoluteMaxCapacity;
- }
-
- @Override
- public synchronized float getAbsoluteUsedCapacity() {
- return absoluteUsedCapacity;
- }
-
- @Override
- public float getMaximumCapacity() {
- return maximumCapacity;
- }
-
- @Override
- public ActiveUsersManager getActiveUsersManager() {
- // Should never be called since all applications are submitted to LeafQueues
- return null;
- }
-
- @Override
- public synchronized float getUsedCapacity() {
- return usedCapacity;
- }
-
- @Override
- public synchronized Resource getUsedResources() {
- return usedResources;
- }
-
- @Override
- public synchronized List<CSQueue> getChildQueues() {
- return new ArrayList<CSQueue>(childQueues);
- }
-
- public synchronized int getNumContainers() {
- return numContainers;
- }
-
- public synchronized int getNumApplications() {
- return numApplications;
- }
-
- @Override
- public synchronized QueueState getState() {
- return state;
- }
-
- @Override
public synchronized QueueInfo getQueueInfo(
boolean includeChildQueues, boolean recursive) {
queueInfo.setCurrentCapacity(usedCapacity);
@@ -391,6 +284,10 @@ public class ParentQueue implements CSQueue {
newlyParsedParentQueue.absoluteMaxCapacity,
newlyParsedParentQueue.state,
newlyParsedParentQueue.acls,
+ newlyParsedParentQueue.accessibleLabels,
+ newlyParsedParentQueue.defaultLabelExpression,
+ newlyParsedParentQueue.capacitiyByNodeLabels,
+ newlyParsedParentQueue.maxCapacityByNodeLabels,
newlyParsedParentQueue.reservationsContinueLooking);
// Re-configure existing child queues and add new ones
@@ -434,21 +331,6 @@ public class ParentQueue implements CSQueue {
}
return queuesMap;
}
-
- @Override
- public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
- synchronized (this) {
- if (acls.get(acl).isUserAllowed(user)) {
- return true;
- }
- }
-
- if (parent != null) {
- return parent.hasAccess(acl, user);
- }
-
- return false;
- }
@Override
public void submitApplication(ApplicationId applicationId, String user,
@@ -521,7 +403,7 @@ public class ParentQueue implements CSQueue {
}
}
- public synchronized void removeApplication(ApplicationId applicationId,
+ private synchronized void removeApplication(ApplicationId applicationId,
String user) {
--numApplications;
@@ -532,30 +414,6 @@ public class ParentQueue implements CSQueue {
" leaf-queue of parent: " + getQueueName() +
" #applications: " + getNumApplications());
}
-
- @Override
- public synchronized void setUsedCapacity(float usedCapacity) {
- this.usedCapacity = usedCapacity;
- }
-
- @Override
- public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
- this.absoluteUsedCapacity = absUsedCapacity;
- }
-
- /**
- * Set maximum capacity - used only for testing.
- * @param maximumCapacity new max capacity
- */
- synchronized void setMaxCapacity(float maximumCapacity) {
- // Sanity check
- CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
- float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
- CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-
- this.maximumCapacity = maximumCapacity;
- this.absoluteMaxCapacity = absMaxCapacity;
- }
@Override
public synchronized CSAssignment assignContainers(
@@ -563,6 +421,12 @@ public class ParentQueue implements CSQueue {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
+ // if our queue cannot access this node, just return
+ if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
+ labelManager.getLabelsOnNode(node.getNodeID()))) {
+ return assignment;
+ }
+
while (canAssign(clusterResource, node)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to assign containers to child-queue of "
@@ -570,8 +434,10 @@ public class ParentQueue implements CSQueue {
}
boolean localNeedToUnreserve = false;
+ Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID());
+
// Are we over maximum-capacity for this queue?
- if (!assignToQueue(clusterResource)) {
+ if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
// check to see if we could if we unreserve first
localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
if (!localNeedToUnreserve) {
@@ -589,7 +455,8 @@ public class ParentQueue implements CSQueue {
resourceCalculator, clusterResource,
assignedToChild.getResource(), Resources.none())) {
// Track resource utilization for the parent-queue
- allocateResource(clusterResource, assignedToChild.getResource());
+ super.allocateResource(clusterResource, assignedToChild.getResource(),
+ nodeLabels);
// Track resource utilization in this pass of the scheduler
Resources.addTo(assignment.getResource(), assignedToChild.getResource());
@@ -628,22 +495,41 @@ public class ParentQueue implements CSQueue {
return assignment;
}
- private synchronized boolean assignToQueue(Resource clusterResource) {
- // Check how of the cluster's absolute capacity we are currently using...
- float currentCapacity =
- Resources.divide(
- resourceCalculator, clusterResource,
- usedResources, clusterResource);
+ private synchronized boolean canAssignToThisQueue(Resource clusterResource,
+ Set<String> nodeLabels) {
+ Set<String> labelCanAccess =
+ new HashSet<String>(
+ accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+ : Sets.intersection(accessibleLabels, nodeLabels));
+ if (nodeLabels.isEmpty()) {
+ // Any queue can always access any node without label
+ labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+ }
- if (currentCapacity >= absoluteMaxCapacity) {
- LOG.info(getQueueName() +
- " used=" + usedResources +
- " current-capacity (" + currentCapacity + ") " +
- " >= max-capacity (" + absoluteMaxCapacity + ")");
- return false;
+ boolean canAssign = true;
+ for (String label : labelCanAccess) {
+ if (!usedResourcesByNodeLabels.containsKey(label)) {
+ usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+ }
+ float currentAbsoluteLabelUsedCapacity =
+ Resources.divide(resourceCalculator, clusterResource,
+ usedResourcesByNodeLabels.get(label),
+ labelManager.getResourceByLabel(label, clusterResource));
+ // if any of the label doesn't beyond limit, we can allocate on this node
+ if (currentAbsoluteLabelUsedCapacity >=
+ getAbsoluteMaximumCapacityByNodeLabel(label)) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(getQueueName() + " used=" + usedResources
+ + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") "
+ + " >= max-capacity ("
+ + labelManager.getResourceByLabel(label, clusterResource) + ")");
+ }
+ canAssign = false;
+ break;
+ }
}
- return true;
-
+
+ return canAssign;
}
@@ -685,7 +571,7 @@ public class ParentQueue implements CSQueue {
node.getAvailableResource(), minimumAllocation);
}
- synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
+ private synchronized CSAssignment assignContainersToChildQueues(Resource cluster,
FiCaSchedulerNode node, boolean needToUnreserve) {
CSAssignment assignment =
new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -728,11 +614,16 @@ public class ParentQueue implements CSQueue {
String getChildQueuesToPrint() {
StringBuilder sb = new StringBuilder();
for (CSQueue q : childQueues) {
- sb.append(q.getQueuePath() + "(" + q.getUsedCapacity() + "), ");
+ sb.append(q.getQueuePath() +
+ "usedCapacity=(" + q.getUsedCapacity() + "), " +
+ " label=("
+ + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",")
+ + ")");
}
return sb.toString();
}
- void printChildQueues() {
+
+ private void printChildQueues() {
if (LOG.isDebugEnabled()) {
LOG.debug("printChildQueues - queue: " + getQueuePath()
+ " child-queues: " + getChildQueuesToPrint());
@@ -749,8 +640,8 @@ public class ParentQueue implements CSQueue {
// Careful! Locking order is important!
// Book keeping
synchronized (this) {
- releaseResource(clusterResource,
- rmContainer.getContainer().getResource());
+ super.releaseResource(clusterResource, rmContainer.getContainer()
+ .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
LOG.info("completedContainer" +
" queue=" + getQueueName() +
@@ -787,27 +678,6 @@ public class ParentQueue implements CSQueue {
}
}
- @Private
- boolean getReservationContinueLooking() {
- return reservationsContinueLooking;
- }
-
- synchronized void allocateResource(Resource clusterResource,
- Resource resource) {
- Resources.addTo(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
- ++numContainers;
- }
-
- synchronized void releaseResource(Resource clusterResource,
- Resource resource) {
- Resources.subtractFrom(usedResources, resource);
- CSQueueUtils.updateQueueStatistics(
- resourceCalculator, this, parent, clusterResource, minimumAllocation);
- --numContainers;
- }
-
@Override
public synchronized void updateClusterResource(Resource clusterResource) {
// Update all children
@@ -821,10 +691,9 @@ public class ParentQueue implements CSQueue {
}
@Override
- public QueueMetrics getMetrics() {
- return metrics;
+ public synchronized List<CSQueue> getChildQueues() {
+ return new ArrayList<CSQueue>(childQueues);
}
-
@Override
public void recoverContainer(Resource clusterResource,
@@ -834,12 +703,20 @@ public class ParentQueue implements CSQueue {
}
// Careful! Locking order is important!
synchronized (this) {
- allocateResource(clusterResource,rmContainer.getContainer().getResource());
+ super.allocateResource(clusterResource, rmContainer.getContainer()
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
}
if (parent != null) {
parent.recoverContainer(clusterResource, attempt, rmContainer);
}
}
+
+ @Override
+ public ActiveUsersManager getActiveUsersManager() {
+ // Should never be called since all applications are submitted to LeafQueues
+ return null;
+ }
@Override
public void collectSchedulerApplications(
@@ -853,8 +730,9 @@ public class ParentQueue implements CSQueue {
public void attachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
- allocateResource(clusterResource, rmContainer.getContainer()
- .getResource());
+ super.allocateResource(clusterResource, rmContainer.getContainer()
+ .getResource(), labelManager.getLabelsOnNode(rmContainer
+ .getContainer().getNodeId()));
LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -870,7 +748,9 @@ public class ParentQueue implements CSQueue {
public void detachContainer(Resource clusterResource,
FiCaSchedulerApp application, RMContainer rmContainer) {
if (application != null) {
- releaseResource(clusterResource, rmContainer.getContainer().getResource());
+ super.releaseResource(clusterResource,
+ rmContainer.getContainer().getResource(),
+ labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
+ " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
+ getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -882,7 +762,14 @@ public class ParentQueue implements CSQueue {
}
}
- public Map<QueueACL, AccessControlList> getACLs() {
- return acls;
+ @Override
+ public float getAbsActualCapacity() {
+ // for now, simply return actual capacity = guaranteed capacity for parent
+ // queue
+ return absoluteCapacity;
+ }
+
+ public synchronized int getNumApplications() {
+ return numApplications;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index b87744d..0725959 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -19,6 +19,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,7 +49,7 @@ public class PlanQueue extends ParentQueue {
private boolean showReservationsAsQueues;
public PlanQueue(CapacitySchedulerContext cs, String queueName,
- CSQueue parent, CSQueue old) {
+ CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
this.schedulerContext = cs;
@@ -104,6 +106,10 @@ public class PlanQueue extends ParentQueue {
newlyParsedParentQueue.getMaximumCapacity(),
newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(),
+ newlyParsedParentQueue.accessibleLabels,
+ newlyParsedParentQueue.defaultLabelExpression,
+ newlyParsedParentQueue.capacitiyByNodeLabels,
+ newlyParsedParentQueue.maxCapacityByNodeLabels,
newlyParsedParentQueue.getReservationContinueLooking());
updateQuotas(newlyParsedParentQueue.userLimit,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 8e61821..c4424b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -42,7 +42,7 @@ public class ReservationQueue extends LeafQueue {
private int maxSystemApps;
public ReservationQueue(CapacitySchedulerContext cs, String queueName,
- PlanQueue parent) {
+ PlanQueue parent) throws IOException {
super(cs, queueName, parent, null);
maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
// the following parameters are common to all reservation in the plan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index d4e043d..e1050da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
+import java.util.Set;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -270,4 +271,16 @@ public abstract class FSQueue implements Queue, Schedulable {
return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
}
+
+ @Override
+ public Set<String> getAccessibleNodeLabels() {
+ // TODO, add implementation for FS
+ return null;
+ }
+
+ @Override
+ public String getDefaultNodeLabelExpression() {
+ // TODO, add implementation for FS
+ return null;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index ea21c2b..532edc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -25,6 +25,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.concurrent.ConcurrentSkipListMap;
import org.apache.commons.logging.Log;
@@ -187,6 +188,18 @@ public class FifoScheduler extends
updateAppHeadRoom(schedulerAttempt);
updateAvailableResourcesMetrics();
}
+
+ @Override
+ public Set<String> getAccessibleNodeLabels() {
+ // TODO add implementation for FIFO scheduler
+ return null;
+ }
+
+ @Override
+ public String getDefaultNodeLabelExpression() {
+ // TODO add implementation for FIFO scheduler
+ return null;
+ }
};
public FifoScheduler() {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index ce5dd96..76ede39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -147,6 +147,7 @@ public class Application {
return used;
}
+ @SuppressWarnings("deprecation")
public synchronized void submit() throws IOException, YarnException {
ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
context.setApplicationId(this.applicationId);