You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/16 03:33:56 UTC

[3/4] YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan. YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. Contributed b

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index b1f239c..5beed37 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -18,7 +18,15 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.StringTokenizer;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -31,10 +39,14 @@ import org.apache.hadoop.yarn.api.records.QueueACL;
 import org.apache.hadoop.yarn.api.records.QueueState;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.ImmutableSet;
+
 public class CapacitySchedulerConfiguration extends Configuration {
 
   private static final Log LOG = 
@@ -83,6 +95,12 @@ public class CapacitySchedulerConfiguration extends Configuration {
   public static final String STATE = "state";
   
   @Private
+  public static final String ACCESSIBLE_NODE_LABELS = "accessible-node-labels";
+  
+  @Private
+  public static final String DEFAULT_NODE_LABEL_EXPRESSION =
+      "default-node-label-expression";
+
   public static final String RESERVE_CONT_LOOK_ALL_NODES = PREFIX
       + "reservations-continue-look-all-nodes";
   
@@ -268,6 +286,10 @@ public class CapacitySchedulerConfiguration extends Configuration {
     return queueName;
   }
   
+  private String getNodeLabelPrefix(String queue, String label) {
+    return getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label + DOT;
+  }
+  
   public int getMaximumSystemApplications() {
     int maxApplications = 
       getInt(MAXIMUM_SYSTEM_APPLICATIONS, DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS);
@@ -343,6 +365,15 @@ public class CapacitySchedulerConfiguration extends Configuration {
         ", maxCapacity=" + maxCapacity);
   }
   
+  public void setCapacityByLabel(String queue, String label, float capacity) {
+    setFloat(getNodeLabelPrefix(queue, label) + CAPACITY, capacity);
+  }
+  
+  public void setMaximumCapacityByLabel(String queue, String label,
+      float capacity) {
+    setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
+  }
+  
   public int getUserLimit(String queue) {
     int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
         DEFAULT_USER_LIMIT);
@@ -372,6 +403,121 @@ public class CapacitySchedulerConfiguration extends Configuration {
         QueueState.valueOf(state.toUpperCase()) : QueueState.RUNNING;
   }
   
+  public void setAccessibleNodeLabels(String queue, Set<String> labels) {
+    if (labels == null) {
+      return;
+    }
+    String str = StringUtils.join(",", labels);
+    set(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS, str);
+  }
+  
+  public Set<String> getAccessibleNodeLabels(String queue) {
+    String accessibleLabelStr =
+        get(getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS);
+
+    // When accessible-label is null, 
+    if (accessibleLabelStr == null) {
+      // Only return null when queue is not ROOT
+      if (!queue.equals(ROOT)) {
+        return null;
+      }
+    } else {
+      // print a warning when accessibleNodeLabel specified in config and queue
+      // is ROOT
+      if (queue.equals(ROOT)) {
+        LOG.warn("Accessible node labels for root queue will be ignored,"
+            + " it will be automatically set to \"*\".");
+      }
+    }
+
+    // always return ANY for queue root
+    if (queue.equals(ROOT)) {
+      return ImmutableSet.of(RMNodeLabelsManager.ANY);
+    }
+
+    // In other cases, split the accessibleLabelStr by ","
+    Set<String> set = new HashSet<String>();
+    for (String str : accessibleLabelStr.split(",")) {
+      if (!str.trim().isEmpty()) {
+        set.add(str.trim());
+      }
+    }
+    
+    // if labels contains "*", only keep ANY behind
+    if (set.contains(RMNodeLabelsManager.ANY)) {
+      set.clear();
+      set.add(RMNodeLabelsManager.ANY);
+    }
+    return Collections.unmodifiableSet(set);
+  }
+  
+  public Map<String, Float> getNodeLabelCapacities(String queue,
+      Set<String> labels, RMNodeLabelsManager mgr) {
+    Map<String, Float> nodeLabelCapacities = new HashMap<String, Float>();
+    
+    if (labels == null) {
+      return nodeLabelCapacities;
+    }
+
+    for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
+        .getClusterNodeLabels() : labels) {
+      // capacity of all labels in each queue should be 1
+      if (org.apache.commons.lang.StringUtils.equals(ROOT, queue)) {
+        nodeLabelCapacities.put(label, 1.0f);
+        continue;
+      }
+      float capacity =
+          getFloat(getNodeLabelPrefix(queue, label) + CAPACITY, UNDEFINED);
+      if (capacity < MINIMUM_CAPACITY_VALUE
+          || capacity > MAXIMUM_CAPACITY_VALUE) {
+        throw new IllegalArgumentException("Illegal " + "capacity of "
+            + capacity + " for label=" + label + " in queue=" + queue);
+      }
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("CSConf - getCapacityOfLabel: prefix="
+            + getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
+      }
+      
+      nodeLabelCapacities.put(label, capacity / 100f);
+    }
+    return nodeLabelCapacities;
+  }
+  
+  public Map<String, Float> getMaximumNodeLabelCapacities(String queue,
+      Set<String> labels, RMNodeLabelsManager mgr) {
+    Map<String, Float> maximumNodeLabelCapacities = new HashMap<String, Float>();
+    if (labels == null) {
+      return maximumNodeLabelCapacities;
+    }
+
+    for (String label : labels.contains(CommonNodeLabelsManager.ANY) ? mgr
+        .getClusterNodeLabels() : labels) {
+      float maxCapacity =
+          getFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
+              UNDEFINED);
+      maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ? 
+          MAXIMUM_CAPACITY_VALUE : maxCapacity;
+      if (maxCapacity < MINIMUM_CAPACITY_VALUE
+          || maxCapacity > MAXIMUM_CAPACITY_VALUE) {
+        throw new IllegalArgumentException("Illegal " + "capacity of "
+            + maxCapacity + " for label=" + label + " in queue=" + queue);
+      }
+      LOG.debug("CSConf - getCapacityOfLabel: prefix="
+          + getNodeLabelPrefix(queue, label) + ", capacity=" + maxCapacity);
+      
+      maximumNodeLabelCapacities.put(label, maxCapacity / 100f);
+    }
+    return maximumNodeLabelCapacities;
+  }
+  
+  public String getDefaultNodeLabelExpression(String queue) {
+    return get(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION);
+  }
+  
+  public void setDefaultNodeLabelExpression(String queue, String exp) {
+    set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
+  }
+
   /*
    * Returns whether we should continue to look at all heart beating nodes even
    * after the reservation limit was hit. The node heart beating in could

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index cab0318..ffeec63 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -24,12 +24,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -52,36 +54,31 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 
 @Private
 @Unstable
-public class LeafQueue implements CSQueue {
+public class LeafQueue extends AbstractCSQueue {
   private static final Log LOG = LogFactory.getLog(LeafQueue.class);
 
-  private final String queueName;
-  private CSQueue parent;
-  private float capacity;
-  private float absoluteCapacity;
-  private float maximumCapacity;
-  private float absoluteMaxCapacity;
   private float absoluteUsedCapacity = 0.0f;
   private int userLimit;
   private float userLimitFactor;
@@ -95,10 +92,6 @@ public class LeafQueue implements CSQueue {
   private int maxActiveApplicationsPerUser;
   
   private int nodeLocalityDelay;
-  
-  private Resource usedResources = Resources.createResource(0, 0);
-  private float usedCapacity = 0.0f;
-  private volatile int numContainers;
 
   Set<FiCaSchedulerApp> activeApplications;
   Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap = 
@@ -106,20 +99,9 @@ public class LeafQueue implements CSQueue {
   
   Set<FiCaSchedulerApp> pendingApplications;
   
-  private final Resource minimumAllocation;
-  private final Resource maximumAllocation;
   private final float minimumAllocationFactor;
 
   private Map<String, User> users = new HashMap<String, User>();
-  
-  private final QueueMetrics metrics;
-
-  private QueueInfo queueInfo; 
-
-  private QueueState state;
-
-  private Map<QueueACL, AccessControlList> acls = 
-    new HashMap<QueueACL, AccessControlList>();
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
@@ -127,29 +109,18 @@ public class LeafQueue implements CSQueue {
   private CapacitySchedulerContext scheduler;
   
   private final ActiveUsersManager activeUsersManager;
-  
-  private final ResourceCalculator resourceCalculator;
-  
-  private boolean reservationsContinueLooking;
+
+  // cache last cluster resource to compute actual capacity
+  private Resource lastClusterResource = Resources.none();
   
   private final QueueHeadroomInfo queueHeadroomInfo = new QueueHeadroomInfo();
   
   public LeafQueue(CapacitySchedulerContext cs, 
-      String queueName, CSQueue parent, CSQueue old) {
+      String queueName, CSQueue parent, CSQueue old) throws IOException {
+    super(cs, queueName, parent, old);
     this.scheduler = cs;
-    this.queueName = queueName;
-    this.parent = parent;
-    
-    this.resourceCalculator = cs.getResourceCalculator();
 
-    // must be after parent and queueName are initialized
-    this.metrics = old != null ? old.getMetrics() :
-        QueueMetrics.forQueue(getQueuePath(), parent,
-			      cs.getConfiguration().getEnableUserMetrics(),
-			      cs.getConf());
     this.activeUsersManager = new ActiveUsersManager(metrics);
-    this.minimumAllocation = cs.getMinimumResourceCapability();
-    this.maximumAllocation = cs.getMaximumResourceCapability();
     this.minimumAllocationFactor = 
         Resources.ratio(resourceCalculator, 
             Resources.subtract(maximumAllocation, minimumAllocation), 
@@ -167,7 +138,8 @@ public class LeafQueue implements CSQueue {
     float userLimitFactor = 
       cs.getConfiguration().getUserLimitFactor(getQueuePath());
 
-    int maxApplications = cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
+    int maxApplications =
+        cs.getConfiguration().getMaximumApplicationsPerQueue(getQueuePath());
     if (maxApplications < 0) {
       int maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
       maxApplications = (int)(maxSystemApps * absoluteCapacity);
@@ -187,12 +159,10 @@ public class LeafQueue implements CSQueue {
                 resourceCalculator,
                 cs.getClusterResource(), this.minimumAllocation,
                 maxAMResourcePerQueuePercent, absoluteCapacity);
-    int maxActiveApplicationsPerUser = 
-        CSQueueUtils.computeMaxActiveApplicationsPerUser(maxActiveAppsUsingAbsCap, userLimit, 
-            userLimitFactor);
+    int maxActiveApplicationsPerUser =
+        CSQueueUtils.computeMaxActiveApplicationsPerUser(
+            maxActiveAppsUsingAbsCap, userLimit, userLimitFactor);
 
-    this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
-    this.queueInfo.setQueueName(queueName);
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
     QueueState state = cs.getConfiguration().getState(getQueuePath());
@@ -200,14 +170,13 @@ public class LeafQueue implements CSQueue {
     Map<QueueACL, AccessControlList> acls = 
       cs.getConfiguration().getAcls(getQueuePath());
 
-    setupQueueConfigs(
-        cs.getClusterResource(),
-        capacity, absoluteCapacity, 
-        maximumCapacity, absoluteMaxCapacity, 
-        userLimit, userLimitFactor, 
+    setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+        maximumCapacity, absoluteMaxCapacity, userLimit, userLimitFactor,
         maxApplications, maxAMResourcePerQueuePercent, maxApplicationsPerUser,
-        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, 
-        cs.getConfiguration().getNodeLocalityDelay(), 
+        maxActiveApplications, maxActiveApplicationsPerUser, state, acls, cs
+            .getConfiguration().getNodeLocalityDelay(), accessibleLabels,
+        defaultLabelExpression, this.capacitiyByNodeLabels,
+        this.maxCapacityByNodeLabels,
         cs.getConfiguration().getReservationContinueLook());
 
     if(LOG.isDebugEnabled()) {
@@ -221,7 +190,7 @@ public class LeafQueue implements CSQueue {
         new TreeSet<FiCaSchedulerApp>(applicationComparator);
     this.activeApplications = new TreeSet<FiCaSchedulerApp>(applicationComparator);
   }
-
+  
   // externalizing in method, to allow overriding
   protected float getCapacityFromConf() {
     return (float)scheduler.getConfiguration().getCapacity(getQueuePath()) / 100;
@@ -236,19 +205,22 @@ public class LeafQueue implements CSQueue {
       int maxApplicationsPerUser, int maxActiveApplications,
       int maxActiveApplicationsPerUser, QueueState state,
       Map<QueueACL, AccessControlList> acls, int nodeLocalityDelay,
-      boolean continueLooking)
-  {
+      Set<String> labels, String defaultLabelExpression,
+      Map<String, Float> capacitieByLabel,
+      Map<String, Float> maximumCapacitiesByLabel, 
+      boolean revervationContinueLooking) throws IOException {
+    super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+        maximumCapacity, absoluteMaxCapacity, state, acls, labels,
+        defaultLabelExpression, capacitieByLabel, maximumCapacitiesByLabel,
+        revervationContinueLooking);
     // Sanity check
     CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
     float absCapacity = getParent().getAbsoluteCapacity() * capacity;
-    CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absCapacity, absoluteMaxCapacity);
+    CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absCapacity,
+        absoluteMaxCapacity);
 
-    this.capacity = capacity; 
     this.absoluteCapacity = absCapacity;
 
-    this.maximumCapacity = maximumCapacity;
-    this.absoluteMaxCapacity = absoluteMaxCapacity;
-
     this.userLimit = userLimit;
     this.userLimitFactor = userLimitFactor;
 
@@ -258,27 +230,35 @@ public class LeafQueue implements CSQueue {
 
     this.maxActiveApplications = maxActiveApplications;
     this.maxActiveApplicationsPerUser = maxActiveApplicationsPerUser;
-    
-    this.state = state;
-
-    this.acls = acls;
 
-    this.queueInfo.setCapacity(this.capacity);
-    this.queueInfo.setMaximumCapacity(this.maximumCapacity);
-    this.queueInfo.setQueueState(this.state);
+    if (!SchedulerUtils.checkQueueLabelExpression(this.accessibleLabels,
+        this.defaultLabelExpression)) {
+      throw new IOException("Invalid default label expression of "
+          + " queue="
+          + queueInfo.getQueueName()
+          + " doesn't have permission to access all labels "
+          + "in default label expression. labelExpression of resource request="
+          + (this.defaultLabelExpression == null ? ""
+              : this.defaultLabelExpression)
+          + ". Queue labels="
+          + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo
+              .getAccessibleNodeLabels().iterator(), ',')));
+    }
     
     this.nodeLocalityDelay = nodeLocalityDelay;
-    this.reservationsContinueLooking = continueLooking;
 
     StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
       aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
     }
-    
-    // Update metrics
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, getParent(), clusterResource, 
-        minimumAllocation);
+
+    StringBuilder labelStrBuilder = new StringBuilder(); 
+    if (labels != null) {
+      for (String s : labels) {
+        labelStrBuilder.append(s);
+        labelStrBuilder.append(",");
+      }
+    }
 
     LOG.info("Initializing " + queueName + "\n" +
         "capacity = " + capacity +
@@ -333,50 +313,12 @@ public class LeafQueue implements CSQueue {
         " [= configuredState ]" + "\n" +
         "acls = " + aclsString +
         " [= configuredAcls ]" + "\n" + 
+        "nodeLocalityDelay = " + nodeLocalityDelay + "\n" +
+        "labels=" + labelStrBuilder.toString() + "\n" +
         "nodeLocalityDelay = " +  nodeLocalityDelay + "\n" +
         "reservationsContinueLooking = " +
         reservationsContinueLooking + "\n");
   }
-  
-  @Override
-  public synchronized float getCapacity() {
-    return capacity;
-  }
-
-  @Override
-  public synchronized float getAbsoluteCapacity() {
-    return absoluteCapacity;
-  }
-
-  @Override
-  public synchronized float getMaximumCapacity() {
-    return maximumCapacity;
-  }
-
-  @Override
-  public synchronized float getAbsoluteMaximumCapacity() {
-    return absoluteMaxCapacity;
-  }
-
-  @Override
-  public synchronized float getAbsoluteUsedCapacity() {
-    return absoluteUsedCapacity;
-  }
-
-  @Override
-  public synchronized CSQueue getParent() {
-    return parent;
-  }
-  
-  @Override
-  public synchronized void setParent(CSQueue newParentQueue) {
-    this.parent = (ParentQueue)newParentQueue;
-  }
-  
-  @Override
-  public String getQueueName() {
-    return queueName;
-  }
 
   @Override
   public String getQueuePath() {
@@ -387,22 +329,6 @@ public class LeafQueue implements CSQueue {
    * Used only by tests.
    */
   @Private
-  public Resource getMinimumAllocation() {
-    return minimumAllocation;
-  }
-
-  /**
-   * Used only by tests.
-   */
-  @Private
-  public Resource getMaximumAllocation() {
-    return maximumAllocation;
-  }
-
-  /**
-   * Used only by tests.
-   */
-  @Private
   public float getMinimumAllocationFactor() {
     return minimumAllocationFactor;
   }
@@ -437,45 +363,9 @@ public class LeafQueue implements CSQueue {
   }
 
   @Override
-  public synchronized float getUsedCapacity() {
-    return usedCapacity;
-  }
-
-  @Override
-  public synchronized Resource getUsedResources() {
-    return usedResources;
-  }
-
-  @Override
   public List<CSQueue> getChildQueues() {
     return null;
   }
-
-  @Override
-  public synchronized void setUsedCapacity(float usedCapacity) {
-    this.usedCapacity = usedCapacity;
-  }
-
-  @Override
-  public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
-    this.absoluteUsedCapacity = absUsedCapacity;
-  }
-
-  /**
-   * Set maximum capacity - used only for testing.
-   * @param maximumCapacity new max capacity
-   */
-  synchronized void setMaxCapacity(float maximumCapacity) {
-    // Sanity check
-    CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absMaxCapacity = 
-        CSQueueUtils.computeAbsoluteMaximumCapacity(
-            maximumCapacity, getParent());
-    CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-    
-    this.maximumCapacity = maximumCapacity;
-    this.absoluteMaxCapacity = absMaxCapacity;
-  }
   
   /**
    * Set user limit - used only for testing.
@@ -569,11 +459,6 @@ public class LeafQueue implements CSQueue {
     return nodeLocalityDelay;
   }
   
-  @Private
-  boolean getReservationContinueLooking() {
-    return reservationsContinueLooking;
-  }
-  
   public String toString() {
     return queueName + ": " + 
         "capacity=" + capacity + ", " + 
@@ -584,6 +469,11 @@ public class LeafQueue implements CSQueue {
         "numApps=" + getNumApplications() + ", " + 
         "numContainers=" + getNumContainers();  
   }
+  
+  @VisibleForTesting
+  public synchronized void setNodeLabelManager(RMNodeLabelsManager mgr) {
+    this.labelManager = mgr;
+  }
 
   @VisibleForTesting
   public synchronized User getUser(String userName) {
@@ -633,6 +523,10 @@ public class LeafQueue implements CSQueue {
         newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
         newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls,
         newlyParsedLeafQueue.getNodeLocalityDelay(),
+        newlyParsedLeafQueue.accessibleLabels,
+        newlyParsedLeafQueue.defaultLabelExpression,
+        newlyParsedLeafQueue.capacitiyByNodeLabels,
+        newlyParsedLeafQueue.maxCapacityByNodeLabels,
         newlyParsedLeafQueue.reservationsContinueLooking);
 
     // queue metrics are updated, more resource may be available
@@ -641,19 +535,6 @@ public class LeafQueue implements CSQueue {
   }
 
   @Override
-  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
-    // Check if the leaf-queue allows access
-    synchronized (this) {
-      if (acls.get(acl).isUserAllowed(user)) {
-        return true;
-      }
-    }
-
-    // Check if parent-queue allows access
-    return getParent().hasAccess(acl, user);
-  }
-
-  @Override
   public void submitApplicationAttempt(FiCaSchedulerApp application,
       String userName) {
     // Careful! Locking order is important!
@@ -749,7 +630,8 @@ public class LeafQueue implements CSQueue {
     }
   }
   
-  private synchronized void addApplicationAttempt(FiCaSchedulerApp application, User user) {
+  private synchronized void addApplicationAttempt(FiCaSchedulerApp application,
+      User user) {
     // Accept 
     user.submitApplication();
     pendingApplications.add(application);
@@ -785,7 +667,8 @@ public class LeafQueue implements CSQueue {
     getParent().finishApplicationAttempt(application, queue);
   }
 
-  public synchronized void removeApplicationAttempt(FiCaSchedulerApp application, User user) {
+  public synchronized void removeApplicationAttempt(
+      FiCaSchedulerApp application, User user) {
     boolean wasActive = activeApplications.remove(application);
     if (!wasActive) {
       pendingApplications.remove(application);
@@ -821,6 +704,21 @@ public class LeafQueue implements CSQueue {
   
   private static final CSAssignment SKIP_ASSIGNMENT = new CSAssignment(true);
   
+  private static Set<String> getRequestLabelSetByExpression(
+      String labelExpression) {
+    Set<String> labels = new HashSet<String>();
+    if (null == labelExpression) {
+      return labels;
+    }
+    for (String l : labelExpression.split("&&")) {
+      if (l.trim().isEmpty()) {
+        continue;
+      }
+      labels.add(l.trim());
+    }
+    return labels;
+  }
+  
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, boolean needToUnreserve) {
@@ -830,6 +728,12 @@ public class LeafQueue implements CSQueue {
         + " #applications=" + activeApplications.size());
     }
     
+    // if our queue cannot access this node, just return
+    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
+        labelManager.getLabelsOnNode(node.getNodeID()))) {
+      return NULL_ASSIGNMENT;
+    }
+    
     // Check for reserved resources
     RMContainer reservedContainer = node.getReservedContainer();
     if (reservedContainer != null) {
@@ -879,6 +783,10 @@ public class LeafQueue implements CSQueue {
               continue;
             }
           }
+          
+          Set<String> requestedNodeLabels =
+              getRequestLabelSetByExpression(anyRequest
+                  .getNodeLabelExpression());
 
           // Compute user-limit & set headroom
           // Note: We compute both user-limit & headroom with the highest 
@@ -887,16 +795,17 @@ public class LeafQueue implements CSQueue {
           //       before all higher priority ones are serviced.
           Resource userLimit = 
               computeUserLimitAndSetHeadroom(application, clusterResource, 
-                  required);          
+                  required, requestedNodeLabels);          
           
           // Check queue max-capacity limit
-          if (!assignToQueue(clusterResource, required, application, true)) {
+          if (!canAssignToThisQueue(clusterResource, required,
+              labelManager.getLabelsOnNode(node.getNodeID()), application, true)) {
             return NULL_ASSIGNMENT;
           }
 
           // Check user limit
           if (!assignToUser(clusterResource, application.getUser(), userLimit,
-              application, true)) {
+              application, true, requestedNodeLabels)) {
             break;
           }
 
@@ -922,7 +831,8 @@ public class LeafQueue implements CSQueue {
 
             // Book-keeping 
             // Note: Update headroom to account for current allocation too...
-            allocateResource(clusterResource, application, assigned);
+            allocateResource(clusterResource, application, assigned,
+                labelManager.getLabelsOnNode(node.getNodeID()));
             
             // Don't reset scheduling opportunities for non-local assignments
             // otherwise the app will be delayed for each non-local assignment.
@@ -976,7 +886,7 @@ public class LeafQueue implements CSQueue {
   protected Resource getHeadroom(User user, Resource queueMaxCap,
       Resource clusterResource, FiCaSchedulerApp application, Resource required) {
     return getHeadroom(user, queueMaxCap, clusterResource,
-	  computeUserLimit(application, clusterResource, required, user));
+	  computeUserLimit(application, clusterResource, required, user, null));
   }
   
   private Resource getHeadroom(User user, Resource queueMaxCap,
@@ -1000,33 +910,49 @@ public class LeafQueue implements CSQueue {
      */
     Resource headroom = 
       Resources.min(resourceCalculator, clusterResource,
-        Resources.subtract(userLimit, user.getConsumedResources()),
+        Resources.subtract(userLimit, user.getTotalConsumedResources()),
         Resources.subtract(queueMaxCap, usedResources)
         );
     return headroom;
   }
 
-
-  @Private
-  protected synchronized boolean assignToQueue(Resource clusterResource, 
-      Resource required, FiCaSchedulerApp application, 
+  synchronized boolean canAssignToThisQueue(Resource clusterResource,
+      Resource required, Set<String> nodeLabels, FiCaSchedulerApp application, 
       boolean checkReservations) {
-
-    Resource potentialTotalResource = Resources.add(usedResources, required);
-    // Check how of the cluster's absolute capacity we are currently using...
-    float potentialNewCapacity = Resources.divide(resourceCalculator,
-        clusterResource, potentialTotalResource, clusterResource);
-    if (potentialNewCapacity > absoluteMaxCapacity) {
+    // Get label of this queue can access, it's (nodeLabel AND queueLabel)
+    Set<String> labelCanAccess;
+    if (null == nodeLabels || nodeLabels.isEmpty()) {
+      labelCanAccess = new HashSet<String>();
+      // Any queue can always access any node without label
+      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+    } else {
+      labelCanAccess = new HashSet<String>(Sets.intersection(accessibleLabels, nodeLabels));
+    }
+    
+    boolean canAssign = true;
+    for (String label : labelCanAccess) {
+      if (!usedResourcesByNodeLabels.containsKey(label)) {
+        usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+      }
+      
+      Resource potentialTotalCapacity =
+          Resources.add(usedResourcesByNodeLabels.get(label), required);
+      
+      float potentialNewCapacity =
+          Resources.divide(resourceCalculator, clusterResource,
+              potentialTotalCapacity,
+              labelManager.getResourceByLabel(label, clusterResource));
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking && checkReservations) {
-
+      // TODO, now only consider reservation cases when the node has no label
+      if (this.reservationsContinueLooking && checkReservations
+          && label.equals(RMNodeLabelsManager.NO_LABEL)) {
         float potentialNewWithoutReservedCapacity = Resources.divide(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(potentialTotalResource,
-                application.getCurrentReservation()),
-             clusterResource);
+            Resources.subtract(potentialTotalCapacity,
+               application.getCurrentReservation()),
+            labelManager.getResourceByLabel(label, clusterResource));
 
         if (potentialNewWithoutReservedCapacity <= absoluteMaxCapacity) {
           if (LOG.isDebugEnabled()) {
@@ -1048,35 +974,43 @@ public class LeafQueue implements CSQueue {
           // we could potentially use this node instead of reserved node
           return true;
         }
-
       }
+      
+      // Otherwise, if any of the label of this node beyond queue limit, we
+      // cannot allocate on this node. Consider a small epsilon here.
+      if (potentialNewCapacity > getAbsoluteMaximumCapacityByNodeLabel(label) + 1e-4) {
+        canAssign = false;
+        break;
+      }
+
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueueName()
-            + " usedResources: " + usedResources
+            + "Check assign to queue, label=" + label
+            + " usedResources: " + usedResourcesByNodeLabels.get(label)
             + " clusterResources: " + clusterResource
             + " currentCapacity "
             + Resources.divide(resourceCalculator, clusterResource,
-              usedResources, clusterResource) + " required " + required
+                usedResourcesByNodeLabels.get(label),
+                labelManager.getResourceByLabel(label, clusterResource))
             + " potentialNewCapacity: " + potentialNewCapacity + " ( "
             + " max-capacity: " + absoluteMaxCapacity + ")");
       }
-      return false;
     }
-    return true;
+    
+    return canAssign;
   }
 
-
-
   @Lock({LeafQueue.class, FiCaSchedulerApp.class})
-  Resource computeUserLimitAndSetHeadroom(
-      FiCaSchedulerApp application, Resource clusterResource, Resource required) {
-    
+  Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
+      Resource clusterResource, Resource required, Set<String> requestedLabels) {
     String user = application.getUser();
-    
     User queueUser = getUser(user);
 
-    Resource userLimit =                          // User limit
-        computeUserLimit(application, clusterResource, required, queueUser);
+    // Compute user limit respect requested labels,
+    // TODO, need consider headroom respect labels also
+    Resource userLimit =
+        computeUserLimit(application, clusterResource, required,
+            queueUser, requestedLabels);
 
     //Max avail capacity needs to take into account usage by ancestor-siblings
     //which are greater than their base capacity, so we are interested in "max avail"
@@ -1096,13 +1030,14 @@ public class LeafQueue implements CSQueue {
       queueHeadroomInfo.setClusterResource(clusterResource);
     }
     
-    Resource headroom = getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
+    Resource headroom =
+        getHeadroom(queueUser, queueMaxCap, clusterResource, userLimit);
     
     if (LOG.isDebugEnabled()) {
       LOG.debug("Headroom calculation for user " + user + ": " + 
           " userLimit=" + userLimit + 
           " queueMaxCap=" + queueMaxCap + 
-          " consumed=" + queueUser.getConsumedResources() + 
+          " consumed=" + queueUser.getTotalConsumedResources() + 
           " headroom=" + headroom);
     }
     
@@ -1117,24 +1052,42 @@ public class LeafQueue implements CSQueue {
   }
   
   @Lock(NoLock.class)
-  private Resource computeUserLimit(FiCaSchedulerApp application, 
-      Resource clusterResource, Resource required, User user) {
+  private Resource computeUserLimit(FiCaSchedulerApp application,
+      Resource clusterResource, Resource required, User user,
+      Set<String> requestedLabels) {
     // What is our current capacity? 
     // * It is equal to the max(required, queue-capacity) if
     //   we're running below capacity. The 'max' ensures that jobs in queues
     //   with miniscule capacity (< 1 slot) make progress
     // * If we're running over capacity, then its
     //   (usedResources + required) (which extra resources we are allocating)
+    Resource queueCapacity = Resource.newInstance(0, 0);
+    if (requestedLabels != null && !requestedLabels.isEmpty()) {
+      // if we have multiple labels to request, we will choose to use the first
+      // label
+      String firstLabel = requestedLabels.iterator().next();
+      queueCapacity =
+          Resources
+              .max(resourceCalculator, clusterResource, queueCapacity,
+                  Resources.multiplyAndNormalizeUp(resourceCalculator,
+                      labelManager.getResourceByLabel(firstLabel,
+                          clusterResource),
+                      getAbsoluteCapacityByNodeLabel(firstLabel),
+                      minimumAllocation));
+    } else {
+      // else there's no label on request, just to use absolute capacity as
+      // capacity for nodes without label
+      queueCapacity =
+          Resources.multiplyAndNormalizeUp(resourceCalculator, labelManager
+                .getResourceByLabel(CommonNodeLabelsManager.NO_LABEL, clusterResource),
+              absoluteCapacity, minimumAllocation);
+    }
 
     // Allow progress for queues with miniscule capacity
-    final Resource queueCapacity =
+    queueCapacity =
         Resources.max(
             resourceCalculator, clusterResource, 
-            Resources.multiplyAndNormalizeUp(
-                resourceCalculator, 
-                clusterResource, 
-                absoluteCapacity, 
-                minimumAllocation), 
+            queueCapacity, 
             required);
 
     Resource currentCapacity =
@@ -1175,7 +1128,7 @@ public class LeafQueue implements CSQueue {
           " userLimit=" + userLimit +
           " userLimitFactor=" + userLimitFactor +
           " required: " + required + 
-          " consumed: " + user.getConsumedResources() + 
+          " consumed: " + user.getTotalConsumedResources() + 
           " limit: " + limit +
           " queueCapacity: " + queueCapacity + 
           " qconsumed: " + usedResources +
@@ -1191,28 +1144,33 @@ public class LeafQueue implements CSQueue {
   @Private
   protected synchronized boolean assignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
-      boolean checkReservations) {
-
+      boolean checkReservations, Set<String> requestLabels) {
     User user = getUser(userName);
+    
+    String label = CommonNodeLabelsManager.NO_LABEL;
+    if (requestLabels != null && !requestLabels.isEmpty()) {
+      label = requestLabels.iterator().next();
+    }
 
     // Note: We aren't considering the current request since there is a fixed
     // overhead of the AM, but it's a > check, not a >= check, so...
-    if (Resources.greaterThan(resourceCalculator, clusterResource,
-        user.getConsumedResources(), limit)) {
-
+    if (Resources
+        .greaterThan(resourceCalculator, clusterResource,
+            user.getConsumedResourceByLabel(label),
+            limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
       if (this.reservationsContinueLooking && checkReservations) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
-            Resources.subtract(user.getConsumedResources(),
+            Resources.subtract(user.getTotalConsumedResources(),
                 application.getCurrentReservation()), limit)) {
 
           if (LOG.isDebugEnabled()) {
             LOG.debug("User " + userName + " in queue " + getQueueName()
                 + " will exceed limit based on reservations - " + " consumed: "
-                + user.getConsumedResources() + " reserved: "
+                + user.getTotalConsumedResources() + " reserved: "
                 + application.getCurrentReservation() + " limit: " + limit);
           }
           return true;
@@ -1221,14 +1179,15 @@ public class LeafQueue implements CSQueue {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + userName + " in queue " + getQueueName()
             + " will exceed limit - " + " consumed: "
-            + user.getConsumedResources() + " limit: " + limit);
+            + user.getTotalConsumedResources() + " limit: " + limit);
       }
       return false;
     }
     return true;
   }
 
-  boolean needContainers(FiCaSchedulerApp application, Priority priority, Resource required) {
+  boolean needContainers(FiCaSchedulerApp application, Priority priority,
+      Resource required) {
     int requiredContainers = application.getTotalRequiredResources(priority);
     int reservedContainers = application.getNumReservedContainers(priority);
     int starvation = 0;
@@ -1258,10 +1217,9 @@ public class LeafQueue implements CSQueue {
     return (((starvation + requiredContainers) - reservedContainers) > 0);
   }
 
-  private CSAssignment assignContainersOnNode(Resource clusterResource, 
-      FiCaSchedulerNode node, FiCaSchedulerApp application, 
-      Priority priority, RMContainer reservedContainer, boolean needToUnreserve) {
-
+  private CSAssignment assignContainersOnNode(Resource clusterResource,
+      FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
+      RMContainer reservedContainer, boolean needToUnreserve) {
     Resource assigned = Resources.none();
 
     // Data-local
@@ -1366,10 +1324,11 @@ public class LeafQueue implements CSQueue {
     // we can't reserve if we got here based on the limit
     // checks assuming we could unreserve!!!
     Resource userLimit = computeUserLimitAndSetHeadroom(application,
-        clusterResource, capability);
+        clusterResource, capability, null);
 
-    // Check queue max-capacity limit
-    if (!assignToQueue(clusterResource, capability, application, false)) {
+    // Check queue max-capacity limit,
+    // TODO: Consider reservation on labels
+    if (!canAssignToThisQueue(clusterResource, capability, null, application, false)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit queue limit");
       }
@@ -1378,7 +1337,7 @@ public class LeafQueue implements CSQueue {
 
     // Check user limit
     if (!assignToUser(clusterResource, application.getUser(), userLimit,
-        application, false)) {
+        application, false, null)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit user limit");
       }
@@ -1516,6 +1475,20 @@ public class LeafQueue implements CSQueue {
         + " request=" + request + " type=" + type
         + " needToUnreserve= " + needToUnreserve);
     }
+    
+    // check if the resource request can access the label
+    if (!SchedulerUtils.checkNodeLabelExpression(
+        labelManager.getLabelsOnNode(node.getNodeID()),
+        request.getNodeLabelExpression())) {
+      // this is a reserved container, but we cannot allocate it now according
+      // to label not match. This can be caused by node label changed
+      // We should un-reserve this container.
+      if (rmContainer != null) {
+        unreserve(application, priority, node, rmContainer);
+      }
+      return Resources.none();
+    }
+    
     Resource capability = request.getCapability();
     Resource available = node.getAvailableResource();
     Resource totalResource = node.getTotalResource();
@@ -1695,8 +1668,9 @@ public class LeafQueue implements CSQueue {
 
         // Book-keeping
         if (removed) {
-          releaseResource(clusterResource,
-              application, container.getResource());
+          releaseResource(clusterResource, application,
+              container.getResource(),
+              labelManager.getLabelsOnNode(node.getNodeID()));
           LOG.info("completedContainer" +
               " container=" + container +
               " queue=" + this +
@@ -1712,18 +1686,18 @@ public class LeafQueue implements CSQueue {
     }
   }
 
-  synchronized void allocateResource(Resource clusterResource, 
-      SchedulerApplicationAttempt application, Resource resource) {
-    // Update queue metrics
-    Resources.addTo(usedResources, resource);
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, getParent(), clusterResource, minimumAllocation);
-    ++numContainers;
-
+  synchronized void allocateResource(Resource clusterResource,
+      SchedulerApplicationAttempt application, Resource resource,
+      Set<String> nodeLabels) {
+    super.allocateResource(clusterResource, resource, nodeLabels);
+    
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.assignContainer(resource);
+    user.assignContainer(resource, nodeLabels);
+    // Note this is a bit unconventional since it gets the object and modifies
+    // it here, rather then using set routine
+    Resources.subtractFrom(application.getHeadroom(), resource); // headroom
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
     
     if (LOG.isDebugEnabled()) {
@@ -1731,33 +1705,30 @@ public class LeafQueue implements CSQueue {
           " user=" + userName + 
           " used=" + usedResources + " numContainers=" + numContainers +
           " headroom = " + application.getHeadroom() +
-          " user-resources=" + user.getConsumedResources()
+          " user-resources=" + user.getTotalConsumedResources()
           );
     }
   }
 
   synchronized void releaseResource(Resource clusterResource, 
-      FiCaSchedulerApp application, Resource resource) {
-    // Update queue metrics
-    Resources.subtractFrom(usedResources, resource);
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, getParent(), clusterResource, 
-        minimumAllocation);
-    --numContainers;
-
+      FiCaSchedulerApp application, Resource resource, Set<String> nodeLabels) {
+    super.releaseResource(clusterResource, resource, nodeLabels);
+    
     // Update user metrics
     String userName = application.getUser();
     User user = getUser(userName);
-    user.releaseContainer(resource);
+    user.releaseContainer(resource, nodeLabels);
     metrics.setAvailableResourcesToUser(userName, application.getHeadroom());
       
     LOG.info(getQueueName() + 
         " used=" + usedResources + " numContainers=" + numContainers + 
-        " user=" + userName + " user-resources=" + user.getConsumedResources());
+        " user=" + userName + " user-resources=" + user.getTotalConsumedResources());
   }
 
   @Override
   public synchronized void updateClusterResource(Resource clusterResource) {
+    lastClusterResource = clusterResource;
+    
     // Update queue properties
     maxActiveApplications = 
         CSQueueUtils.computeMaxActiveApplications(
@@ -1786,25 +1757,29 @@ public class LeafQueue implements CSQueue {
     for (FiCaSchedulerApp application : activeApplications) {
       synchronized (application) {
         computeUserLimitAndSetHeadroom(application, clusterResource, 
-            Resources.none());
+            Resources.none(), null);
       }
     }
   }
-  
-  @Override
-  public QueueMetrics getMetrics() {
-    return metrics;
-  }
 
   @VisibleForTesting
   public static class User {
     Resource consumed = Resources.createResource(0, 0);
+    Map<String, Resource> consumedByLabel = new HashMap<String, Resource>();
     int pendingApplications = 0;
     int activeApplications = 0;
 
-    public Resource getConsumedResources() {
+    public Resource getTotalConsumedResources() {
       return consumed;
     }
+    
+    public Resource getConsumedResourceByLabel(String label) {
+      Resource r = consumedByLabel.get(label);
+      if (null != r) {
+        return r;
+      }
+      return Resources.none();
+    }
 
     public int getPendingApplications() {
       return pendingApplications;
@@ -1836,12 +1811,46 @@ public class LeafQueue implements CSQueue {
       }
     }
 
-    public synchronized void assignContainer(Resource resource) {
+    public synchronized void assignContainer(Resource resource,
+        Set<String> nodeLabels) {
       Resources.addTo(consumed, resource);
+      
+      if (nodeLabels == null || nodeLabels.isEmpty()) {
+        if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+          consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
+              Resources.createResource(0));
+        }
+        Resources.addTo(consumedByLabel.get(RMNodeLabelsManager.NO_LABEL),
+            resource);
+      } else {
+        for (String label : nodeLabels) {
+          if (!consumedByLabel.containsKey(label)) {
+            consumedByLabel.put(label, Resources.createResource(0));
+          }
+          Resources.addTo(consumedByLabel.get(label), resource);
+        }
+      }
     }
 
-    public synchronized void releaseContainer(Resource resource) {
+    public synchronized void releaseContainer(Resource resource, Set<String> nodeLabels) {
       Resources.subtractFrom(consumed, resource);
+      
+      // Update usedResources by labels
+      if (nodeLabels == null || nodeLabels.isEmpty()) {
+        if (!consumedByLabel.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+          consumedByLabel.put(RMNodeLabelsManager.NO_LABEL,
+              Resources.createResource(0));
+        }
+        Resources.subtractFrom(
+            consumedByLabel.get(RMNodeLabelsManager.NO_LABEL), resource);
+      } else {
+        for (String label : nodeLabels) {
+          if (!consumedByLabel.containsKey(label)) {
+            consumedByLabel.put(label, Resources.createResource(0));
+          }
+          Resources.subtractFrom(consumedByLabel.get(label), resource);
+        }
+      }
     }  
   }
 
@@ -1854,7 +1863,8 @@ public class LeafQueue implements CSQueue {
     // Careful! Locking order is important! 
     synchronized (this) {
       allocateResource(clusterResource, attempt, rmContainer.getContainer()
-        .getResource());
+          .getResource(), labelManager.getLabelsOnNode(rmContainer
+          .getContainer().getNodeId()));
     }
     getParent().recoverContainer(clusterResource, attempt, rmContainer);
   }
@@ -1892,7 +1902,8 @@ public class LeafQueue implements CSQueue {
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
       allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource());
+          .getResource(), labelManager.getLabelsOnNode(rmContainer
+          .getContainer().getNodeId()));
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1908,7 +1919,8 @@ public class LeafQueue implements CSQueue {
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
       releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource());
+          .getResource(), labelManager.getLabelsOnNode(rmContainer.getContainer()
+          .getNodeId()));
       LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
           + " resource=" + rmContainer.getContainer().getResource()
           + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
@@ -1919,6 +1931,24 @@ public class LeafQueue implements CSQueue {
     }
   }
 
+  @Override
+  public float getAbsActualCapacity() {
+    if (Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
+        lastClusterResource, Resources.none())) {
+      return absoluteCapacity;
+    }
+
+    Resource resourceRespectLabels =
+        labelManager == null ? lastClusterResource : labelManager
+            .getQueueResource(queueName, accessibleLabels, lastClusterResource);
+    float absActualCapacity =
+        Resources.divide(resourceCalculator, lastClusterResource,
+            resourceRespectLabels, lastClusterResource);
+    
+    return absActualCapacity > absoluteCapacity ? absoluteCapacity
+        : absActualCapacity;
+  }
+  
   public void setCapacity(float capacity) {
     this.capacity = capacity;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 011c99c..6ffaf4c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -23,12 +23,14 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeSet;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -46,77 +48,42 @@ import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.Sets;
+
 @Private
 @Evolving
-public class ParentQueue implements CSQueue {
+public class ParentQueue extends AbstractCSQueue {
 
   private static final Log LOG = LogFactory.getLog(ParentQueue.class);
 
-  private CSQueue parent;
-  private final String queueName;
-  
-  private float capacity;
-  private float maximumCapacity;
-  private float absoluteCapacity;
-  private float absoluteMaxCapacity;
-  private float absoluteUsedCapacity = 0.0f;
-
-  private float usedCapacity = 0.0f;
-
-  protected final Set<CSQueue> childQueues;
-  private final Comparator<CSQueue> queueComparator;
-  
-  private Resource usedResources = Resources.createResource(0, 0);
-  
+  protected final Set<CSQueue> childQueues;  
   private final boolean rootQueue;
-  
-  private final Resource minimumAllocation;
-
-  private volatile int numApplications;
-  private volatile int numContainers;
-
-  private QueueState state;
-
-  private final QueueMetrics metrics;
-
-  private QueueInfo queueInfo; 
-
-  private Map<QueueACL, AccessControlList> acls = 
-    new HashMap<QueueACL, AccessControlList>();
+  final Comparator<CSQueue> queueComparator;
+  volatile int numApplications;
 
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
-  private final ResourceCalculator resourceCalculator;
-
-  private boolean reservationsContinueLooking;
-  
   public ParentQueue(CapacitySchedulerContext cs, 
-      String queueName, CSQueue parent, CSQueue old) {
-    minimumAllocation = cs.getMinimumResourceCapability();
+      String queueName, CSQueue parent, CSQueue old) throws IOException {
+    super(cs, queueName, parent, old);
     
-    this.parent = parent;
-    this.queueName = queueName;
-    this.rootQueue = (parent == null);
-    this.resourceCalculator = cs.getResourceCalculator();
+    this.queueComparator = cs.getQueueComparator();
 
-    // must be called after parent and queueName is set
-    this.metrics = old != null ? old.getMetrics() :
-        QueueMetrics.forQueue(getQueuePath(), parent,
-			      cs.getConfiguration().getEnableUserMetrics(),
-			      cs.getConf());
+    this.rootQueue = (parent == null);
 
     float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
 
@@ -141,17 +108,14 @@ public class ParentQueue implements CSQueue {
 
     Map<QueueACL, AccessControlList> acls = 
       cs.getConfiguration().getAcls(getQueuePath());
-    
-    this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
-    this.queueInfo.setQueueName(queueName);
+
     this.queueInfo.setChildQueues(new ArrayList<QueueInfo>());
 
-    setupQueueConfigs(cs.getClusterResource(),
-        capacity, absoluteCapacity, 
-        maximumCapacity, absoluteMaxCapacity, state, acls,
+    setupQueueConfigs(cs.getClusterResource(), capacity, absoluteCapacity,
+        maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
+        defaultLabelExpression, capacitiyByNodeLabels, maxCapacityByNodeLabels, 
         cs.getConfiguration().getReservationContinueLook());
     
-    this.queueComparator = cs.getQueueComparator();
     this.childQueues = new TreeSet<CSQueue>(queueComparator);
 
     LOG.info("Initialized parent-queue " + queueName + 
@@ -159,41 +123,29 @@ public class ParentQueue implements CSQueue {
         ", fullname=" + getQueuePath()); 
   }
 
-  protected synchronized void setupQueueConfigs(
-      Resource clusterResource,
-      float capacity, float absoluteCapacity, 
-      float maximumCapacity, float absoluteMaxCapacity,
+  synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+      float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
       QueueState state, Map<QueueACL, AccessControlList> acls,
-      boolean continueLooking
-  ) {
-    // Sanity check
-    CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absoluteMaxCapacity);
-
-    this.capacity = capacity;
-    this.absoluteCapacity = absoluteCapacity;
-
-    this.maximumCapacity = maximumCapacity;
-    this.absoluteMaxCapacity = absoluteMaxCapacity;
-
-    this.state = state;
-
-    this.acls = acls;
-    
-    this.queueInfo.setCapacity(this.capacity);
-    this.queueInfo.setMaximumCapacity(this.maximumCapacity);
-    this.queueInfo.setQueueState(this.state);
-
-    this.reservationsContinueLooking = continueLooking;
-
-    StringBuilder aclsString = new StringBuilder();
+      Set<String> accessibleLabels, String defaultLabelExpression,
+      Map<String, Float> nodeLabelCapacities,
+      Map<String, Float> maximumCapacitiesByLabel, 
+      boolean reservationContinueLooking) throws IOException {
+    super.setupQueueConfigs(clusterResource, capacity, absoluteCapacity,
+        maximumCapacity, absoluteMaxCapacity, state, acls, accessibleLabels,
+        defaultLabelExpression, nodeLabelCapacities, maximumCapacitiesByLabel,
+        reservationContinueLooking);
+   StringBuilder aclsString = new StringBuilder();
     for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
       aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
     }
 
-    // Update metrics
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, parent, clusterResource, minimumAllocation);
+    StringBuilder labelStrBuilder = new StringBuilder(); 
+    if (accessibleLabels != null) {
+      for (String s : accessibleLabels) {
+        labelStrBuilder.append(s);
+        labelStrBuilder.append(",");
+      }
+    }
 
     LOG.info(queueName +
         ", capacity=" + capacity +
@@ -201,13 +153,13 @@ public class ParentQueue implements CSQueue {
         ", maxCapacity=" + maximumCapacity +
         ", asboluteMaxCapacity=" + absoluteMaxCapacity + 
         ", state=" + state +
-        ", acls=" + aclsString +
+        ", acls=" + aclsString + 
+        ", labels=" + labelStrBuilder.toString() + "\n" +
         ", reservationsContinueLooking=" + reservationsContinueLooking);
   }
 
   private static float PRECISION = 0.0005f; // 0.05% precision
   void setChildQueues(Collection<CSQueue> childQueues) {
-    
     // Validate
     float childCapacities = 0;
     for (CSQueue queue : childQueues) {
@@ -221,6 +173,21 @@ public class ParentQueue implements CSQueue {
       		" capacity of " + childCapacities + 
       		" for children of queue " + queueName);
     }
+    // check label capacities
+    for (String nodeLabel : labelManager.getClusterNodeLabels()) {
+      float capacityByLabel = getCapacityByNodeLabel(nodeLabel);
+      // check children's labels
+      float sum = 0;
+      for (CSQueue queue : childQueues) {
+        sum += queue.getCapacityByNodeLabel(nodeLabel);
+      }
+      if ((capacityByLabel > 0 && Math.abs(1.0f - sum) > PRECISION)
+          || (capacityByLabel == 0) && (sum > 0)) {
+        throw new IllegalArgumentException("Illegal" + " capacity of "
+            + sum + " for children of queue " + queueName
+            + " for label=" + nodeLabel);
+      }
+    }
     
     this.childQueues.clear();
     this.childQueues.addAll(childQueues);
@@ -228,21 +195,6 @@ public class ParentQueue implements CSQueue {
       LOG.debug("setChildQueues: " + getChildQueuesToPrint());
     }
   }
-  
-  @Override
-  public synchronized CSQueue getParent() {
-    return parent;
-  }
-
-  @Override
-  public synchronized void setParent(CSQueue newParentQueue) {
-    this.parent = (ParentQueue)newParentQueue;
-  }
-  
-  @Override
-  public String getQueueName() {
-    return queueName;
-  }
 
   @Override
   public String getQueuePath() {
@@ -251,65 +203,6 @@ public class ParentQueue implements CSQueue {
   }
 
   @Override
-  public synchronized float getCapacity() {
-    return capacity;
-  }
-
-  @Override
-  public synchronized float getAbsoluteCapacity() {
-    return absoluteCapacity;
-  }
-
-  @Override
-  public float getAbsoluteMaximumCapacity() {
-    return absoluteMaxCapacity;
-  }
-
-  @Override
-  public synchronized float getAbsoluteUsedCapacity() {
-    return absoluteUsedCapacity;
-  }
-
-  @Override
-  public float getMaximumCapacity() {
-    return maximumCapacity;
-  }
-
-  @Override
-  public ActiveUsersManager getActiveUsersManager() {
-    // Should never be called since all applications are submitted to LeafQueues
-    return null;
-  }
-
-  @Override
-  public synchronized float getUsedCapacity() {
-    return usedCapacity;
-  }
-
-  @Override
-  public synchronized Resource getUsedResources() {
-    return usedResources;
-  }
-  
-  @Override
-  public synchronized List<CSQueue> getChildQueues() {
-    return new ArrayList<CSQueue>(childQueues);
-  }
-
-  public synchronized int getNumContainers() {
-    return numContainers;
-  }
-  
-  public synchronized int getNumApplications() {
-    return numApplications;
-  }
-
-  @Override
-  public synchronized QueueState getState() {
-    return state;
-  }
-
-  @Override
   public synchronized QueueInfo getQueueInfo( 
       boolean includeChildQueues, boolean recursive) {
     queueInfo.setCurrentCapacity(usedCapacity);
@@ -391,6 +284,10 @@ public class ParentQueue implements CSQueue {
         newlyParsedParentQueue.absoluteMaxCapacity,
         newlyParsedParentQueue.state, 
         newlyParsedParentQueue.acls,
+        newlyParsedParentQueue.accessibleLabels,
+        newlyParsedParentQueue.defaultLabelExpression,
+        newlyParsedParentQueue.capacitiyByNodeLabels,
+        newlyParsedParentQueue.maxCapacityByNodeLabels,
         newlyParsedParentQueue.reservationsContinueLooking);
 
     // Re-configure existing child queues and add new ones
@@ -434,21 +331,6 @@ public class ParentQueue implements CSQueue {
     }
     return queuesMap;
   }
-  
-  @Override
-  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
-    synchronized (this) {
-      if (acls.get(acl).isUserAllowed(user)) {
-        return true;
-      }
-    }
-    
-    if (parent != null) {
-      return parent.hasAccess(acl, user);
-    }
-    
-    return false;
-  }
 
   @Override
   public void submitApplication(ApplicationId applicationId, String user,
@@ -521,7 +403,7 @@ public class ParentQueue implements CSQueue {
     }
   }
 
-  public synchronized void removeApplication(ApplicationId applicationId, 
+  private synchronized void removeApplication(ApplicationId applicationId, 
       String user) {
     
     --numApplications;
@@ -532,30 +414,6 @@ public class ParentQueue implements CSQueue {
         " leaf-queue of parent: " + getQueueName() + 
         " #applications: " + getNumApplications());
   }
-  
-  @Override
-  public synchronized void setUsedCapacity(float usedCapacity) {
-    this.usedCapacity = usedCapacity;
-  }
-  
-  @Override
-  public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
-    this.absoluteUsedCapacity = absUsedCapacity;
-  }
-
-  /**
-   * Set maximum capacity - used only for testing.
-   * @param maximumCapacity new max capacity
-   */
-  synchronized void setMaxCapacity(float maximumCapacity) {
-    // Sanity check
-    CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
-    float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
-    CSQueueUtils.checkAbsoluteCapacities(getQueueName(), absoluteCapacity, absMaxCapacity);
-    
-    this.maximumCapacity = maximumCapacity;
-    this.absoluteMaxCapacity = absMaxCapacity;
-  }
 
   @Override
   public synchronized CSAssignment assignContainers(
@@ -563,6 +421,12 @@ public class ParentQueue implements CSQueue {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
     
+    // if our queue cannot access this node, just return
+    if (!SchedulerUtils.checkQueueAccessToNode(accessibleLabels,
+        labelManager.getLabelsOnNode(node.getNodeID()))) {
+      return assignment;
+    }
+    
     while (canAssign(clusterResource, node)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("Trying to assign containers to child-queue of "
@@ -570,8 +434,10 @@ public class ParentQueue implements CSQueue {
       }
       
       boolean localNeedToUnreserve = false;
+      Set<String> nodeLabels = labelManager.getLabelsOnNode(node.getNodeID()); 
+      
       // Are we over maximum-capacity for this queue?
-      if (!assignToQueue(clusterResource)) {
+      if (!canAssignToThisQueue(clusterResource, nodeLabels)) {
         // check to see if we could if we unreserve first
         localNeedToUnreserve = assignToQueueIfUnreserve(clusterResource);
         if (!localNeedToUnreserve) {
@@ -589,7 +455,8 @@ public class ParentQueue implements CSQueue {
               resourceCalculator, clusterResource, 
               assignedToChild.getResource(), Resources.none())) {
         // Track resource utilization for the parent-queue
-        allocateResource(clusterResource, assignedToChild.getResource());
+        super.allocateResource(clusterResource, assignedToChild.getResource(),
+            nodeLabels);
         
         // Track resource utilization in this pass of the scheduler
         Resources.addTo(assignment.getResource(), assignedToChild.getResource());
@@ -628,22 +495,41 @@ public class ParentQueue implements CSQueue {
     return assignment;
   }
 
-  private synchronized boolean assignToQueue(Resource clusterResource) {
-    // Check how of the cluster's absolute capacity we are currently using...
-    float currentCapacity =
-        Resources.divide(
-            resourceCalculator, clusterResource, 
-            usedResources, clusterResource);
+  private synchronized boolean canAssignToThisQueue(Resource clusterResource,
+      Set<String> nodeLabels) {
+    Set<String> labelCanAccess =
+        new HashSet<String>(
+            accessibleLabels.contains(CommonNodeLabelsManager.ANY) ? nodeLabels
+                : Sets.intersection(accessibleLabels, nodeLabels));
+    if (nodeLabels.isEmpty()) {
+      // Any queue can always access any node without label
+      labelCanAccess.add(RMNodeLabelsManager.NO_LABEL);
+    }
     
-    if (currentCapacity >= absoluteMaxCapacity) {
-      LOG.info(getQueueName() + 
-          " used=" + usedResources + 
-          " current-capacity (" + currentCapacity + ") " +
-          " >= max-capacity (" + absoluteMaxCapacity + ")");
-      return false;
+    boolean canAssign = true;
+    for (String label : labelCanAccess) {
+      if (!usedResourcesByNodeLabels.containsKey(label)) {
+        usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+      }
+      float currentAbsoluteLabelUsedCapacity =
+          Resources.divide(resourceCalculator, clusterResource,
+              usedResourcesByNodeLabels.get(label),
+              labelManager.getResourceByLabel(label, clusterResource));
+      // if any of the label doesn't beyond limit, we can allocate on this node
+      if (currentAbsoluteLabelUsedCapacity >= 
+            getAbsoluteMaximumCapacityByNodeLabel(label)) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(getQueueName() + " used=" + usedResources
+              + " current-capacity (" + usedResourcesByNodeLabels.get(label) + ") "
+              + " >= max-capacity ("
+              + labelManager.getResourceByLabel(label, clusterResource) + ")");
+        }
+        canAssign = false;
+        break;
+      }
     }
-    return true;
-
+    
+    return canAssign;
   }
 
   
@@ -685,7 +571,7 @@ public class ParentQueue implements CSQueue {
             node.getAvailableResource(), minimumAllocation);
   }
   
-  synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
+  private synchronized CSAssignment assignContainersToChildQueues(Resource cluster, 
       FiCaSchedulerNode node, boolean needToUnreserve) {
     CSAssignment assignment = 
         new CSAssignment(Resources.createResource(0, 0), NodeType.NODE_LOCAL);
@@ -728,11 +614,16 @@ public class ParentQueue implements CSQueue {
   String getChildQueuesToPrint() {
     StringBuilder sb = new StringBuilder();
     for (CSQueue q : childQueues) {
-      sb.append(q.getQueuePath() + "(" + q.getUsedCapacity() + "), ");
+      sb.append(q.getQueuePath() + 
+          "usedCapacity=(" + q.getUsedCapacity() + "), " + 
+          " label=("
+          + StringUtils.join(q.getAccessibleNodeLabels().iterator(), ",") 
+          + ")");
     }
     return sb.toString();
   }
-  void printChildQueues() {
+
+  private void printChildQueues() {
     if (LOG.isDebugEnabled()) {
       LOG.debug("printChildQueues - queue: " + getQueuePath()
         + " child-queues: " + getChildQueuesToPrint());
@@ -749,8 +640,8 @@ public class ParentQueue implements CSQueue {
       // Careful! Locking order is important!
       // Book keeping
       synchronized (this) {
-        releaseResource(clusterResource, 
-            rmContainer.getContainer().getResource());
+        super.releaseResource(clusterResource, rmContainer.getContainer()
+            .getResource(), labelManager.getLabelsOnNode(node.getNodeID()));
 
         LOG.info("completedContainer" +
             " queue=" + getQueueName() + 
@@ -787,27 +678,6 @@ public class ParentQueue implements CSQueue {
     }
   }
 
-  @Private
-  boolean getReservationContinueLooking() {
-    return reservationsContinueLooking;
-  }
-  
-  synchronized void allocateResource(Resource clusterResource, 
-      Resource resource) {
-    Resources.addTo(usedResources, resource);
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, parent, clusterResource, minimumAllocation);
-    ++numContainers;
-  }
-  
-  synchronized void releaseResource(Resource clusterResource, 
-      Resource resource) {
-    Resources.subtractFrom(usedResources, resource);
-    CSQueueUtils.updateQueueStatistics(
-        resourceCalculator, this, parent, clusterResource, minimumAllocation);
-    --numContainers;
-  }
-
   @Override
   public synchronized void updateClusterResource(Resource clusterResource) {
     // Update all children
@@ -821,10 +691,9 @@ public class ParentQueue implements CSQueue {
   }
   
   @Override
-  public QueueMetrics getMetrics() {
-    return metrics;
+  public synchronized List<CSQueue> getChildQueues() {
+    return new ArrayList<CSQueue>(childQueues);
   }
-
   
   @Override
   public void recoverContainer(Resource clusterResource,
@@ -834,12 +703,20 @@ public class ParentQueue implements CSQueue {
     }
     // Careful! Locking order is important! 
     synchronized (this) {
-      allocateResource(clusterResource,rmContainer.getContainer().getResource());
+      super.allocateResource(clusterResource, rmContainer.getContainer()
+          .getResource(), labelManager.getLabelsOnNode(rmContainer
+          .getContainer().getNodeId()));
     }
     if (parent != null) {
       parent.recoverContainer(clusterResource, attempt, rmContainer);
     }
   }
+  
+  @Override
+  public ActiveUsersManager getActiveUsersManager() {
+    // Should never be called since all applications are submitted to LeafQueues
+    return null;
+  }
 
   @Override
   public void collectSchedulerApplications(
@@ -853,8 +730,9 @@ public class ParentQueue implements CSQueue {
   public void attachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
-      allocateResource(clusterResource, rmContainer.getContainer()
-          .getResource());
+      super.allocateResource(clusterResource, rmContainer.getContainer()
+          .getResource(), labelManager.getLabelsOnNode(rmContainer
+          .getContainer().getNodeId()));
       LOG.info("movedContainer" + " queueMoveIn=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -870,7 +748,9 @@ public class ParentQueue implements CSQueue {
   public void detachContainer(Resource clusterResource,
       FiCaSchedulerApp application, RMContainer rmContainer) {
     if (application != null) {
-      releaseResource(clusterResource, rmContainer.getContainer().getResource());
+      super.releaseResource(clusterResource,
+          rmContainer.getContainer().getResource(),
+          labelManager.getLabelsOnNode(rmContainer.getContainer().getNodeId()));
       LOG.info("movedContainer" + " queueMoveOut=" + getQueueName()
           + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity="
           + getAbsoluteUsedCapacity() + " used=" + usedResources + " cluster="
@@ -882,7 +762,14 @@ public class ParentQueue implements CSQueue {
     }
   }
 
-  public Map<QueueACL, AccessControlList> getACLs() {
-    return acls;
+  @Override
+  public float getAbsActualCapacity() {
+    // for now, simply return actual capacity = guaranteed capacity for parent
+    // queue
+    return absoluteCapacity;
+  }
+  
+  public synchronized int getNumApplications() {
+    return numApplications;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index b87744d..0725959 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -19,6 +19,8 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 
 import org.apache.hadoop.yarn.api.records.Resource;
@@ -47,7 +49,7 @@ public class PlanQueue extends ParentQueue {
   private boolean showReservationsAsQueues;
 
   public PlanQueue(CapacitySchedulerContext cs, String queueName,
-      CSQueue parent, CSQueue old) {
+      CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
 
     this.schedulerContext = cs;
@@ -104,6 +106,10 @@ public class PlanQueue extends ParentQueue {
         newlyParsedParentQueue.getMaximumCapacity(),
         newlyParsedParentQueue.getAbsoluteMaximumCapacity(),
         newlyParsedParentQueue.getState(), newlyParsedParentQueue.getACLs(),
+        newlyParsedParentQueue.accessibleLabels,
+        newlyParsedParentQueue.defaultLabelExpression,
+        newlyParsedParentQueue.capacitiyByNodeLabels,
+        newlyParsedParentQueue.maxCapacityByNodeLabels,
         newlyParsedParentQueue.getReservationContinueLooking());
 
     updateQuotas(newlyParsedParentQueue.userLimit,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 8e61821..c4424b5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -42,7 +42,7 @@ public class ReservationQueue extends LeafQueue {
   private int maxSystemApps;
 
   public ReservationQueue(CapacitySchedulerContext cs, String queueName,
-      PlanQueue parent) {
+      PlanQueue parent) throws IOException {
     super(cs, queueName, parent, null);
     maxSystemApps = cs.getConfiguration().getMaximumSystemApplications();
     // the following parameters are common to all reservation in the plan

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index d4e043d..e1050da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -270,4 +271,16 @@ public abstract class FSQueue implements Queue, Schedulable {
     return String.format("[%s, demand=%s, running=%s, share=%s, w=%s]",
         getName(), getDemand(), getResourceUsage(), fairShare, getWeights());
   }
+  
+  @Override
+  public Set<String> getAccessibleNodeLabels() {
+    // TODO, add implementation for FS
+    return null;
+  }
+  
+  @Override
+  public String getDefaultNodeLabelExpression() {
+    // TODO, add implementation for FS
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index ea21c2b..532edc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -25,6 +25,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.ConcurrentSkipListMap;
 
 import org.apache.commons.logging.Log;
@@ -187,6 +188,18 @@ public class FifoScheduler extends
       updateAppHeadRoom(schedulerAttempt);
       updateAvailableResourcesMetrics();
     }
+
+    @Override
+    public Set<String> getAccessibleNodeLabels() {
+      // TODO add implementation for FIFO scheduler
+      return null;
+    }
+
+    @Override
+    public String getDefaultNodeLabelExpression() {
+      // TODO add implementation for FIFO scheduler
+      return null;
+    }
   };
 
   public FifoScheduler() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
index ce5dd96..76ede39 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/Application.java
@@ -147,6 +147,7 @@ public class Application {
     return used;
   }
   
+  @SuppressWarnings("deprecation")
   public synchronized void submit() throws IOException, YarnException {
     ApplicationSubmissionContext context = recordFactory.newRecordInstance(ApplicationSubmissionContext.class);
     context.setApplicationId(this.applicationId);