You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ka...@apache.org on 2015/03/25 15:54:35 UTC

[05/51] [abbrv] hadoop git commit: YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan

YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to track used-resources-by-label. Contributed by Wangda Tan


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/586348e4
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/586348e4
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/586348e4

Branch: refs/heads/YARN-2139
Commit: 586348e4cbf197188057d6b843a6701cfffdaff3
Parents: d81109e
Author: Jian He <ji...@apache.org>
Authored: Fri Mar 20 13:54:01 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Mar 20 13:54:01 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../scheduler/AbstractYarnScheduler.java        |   5 +-
 .../scheduler/AppSchedulingInfo.java            |  27 ++-
 .../server/resourcemanager/scheduler/Queue.java |  20 +++
 .../scheduler/ResourceUsage.java                |  19 ++-
 .../scheduler/SchedulerApplicationAttempt.java  |  50 +++---
 .../scheduler/SchedulerNode.java                |  14 ++
 .../scheduler/capacity/AbstractCSQueue.java     |  24 +++
 .../scheduler/capacity/LeafQueue.java           |  29 +++-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  17 +-
 .../scheduler/fair/FSAppAttempt.java            |  11 +-
 .../resourcemanager/scheduler/fair/FSQueue.java |   8 +
 .../scheduler/fifo/FifoScheduler.java           |  12 +-
 .../yarn/server/resourcemanager/MockAM.java     |  24 +--
 .../capacity/TestCapacityScheduler.java         | 167 ++++++++++++++++++-
 .../scheduler/capacity/TestChildQueueOrder.java |   3 +-
 .../capacity/TestContainerAllocation.java       |  70 +-------
 .../scheduler/capacity/TestReservations.java    |   6 +-
 .../scheduler/capacity/TestUtils.java           | 129 ++++++++++++++
 19 files changed, 509 insertions(+), 129 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index bbd018a..046b7b1 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -65,6 +65,9 @@ Release 2.8.0 - UNRELEASED
     YARN-3357. Move TestFifoScheduler to FIFO package. (Rohith Sharmaks 
     via devaraj)
 
+    YARN-3356. Capacity Scheduler FiCaSchedulerApp should use ResourceUsage to
+    track used-resources-by-label. (Wangda Tan via jianhe)
+
   OPTIMIZATIONS
 
     YARN-3339. TestDockerContainerExecutor should pull a single image and not

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 968a767..e1f94cf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -358,14 +358,15 @@ public abstract class AbstractYarnScheduler
         container));
 
       // recover scheduler node
-      nodes.get(nm.getNodeID()).recoverContainer(rmContainer);
+      SchedulerNode schedulerNode = nodes.get(nm.getNodeID());
+      schedulerNode.recoverContainer(rmContainer);
 
       // recover queue: update headroom etc.
       Queue queue = schedulerAttempt.getQueue();
       queue.recoverContainer(clusterResource, schedulerAttempt, rmContainer);
 
       // recover scheduler attempt
-      schedulerAttempt.recoverContainer(rmContainer);
+      schedulerAttempt.recoverContainer(schedulerNode, rmContainer);
             
       // set master container for the current running AMContainer for this
       // attempt.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index 1324c7d..84ebe9c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -191,6 +189,16 @@ public class AppSchedulingInfo {
             request.getCapability());
         metrics.decrPendingResources(user, lastRequestContainers,
             lastRequestCapability);
+        
+        // update queue:
+        queue.incPendingResource(
+            request.getNodeLabelExpression(),
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
+        if (lastRequest != null) {
+          queue.decPendingResource(lastRequest.getNodeLabelExpression(),
+              Resources.multiply(lastRequestCapability, lastRequestContainers));
+        }
       }
     }
   }
@@ -376,6 +384,9 @@ public class AppSchedulingInfo {
     if (numOffSwitchContainers == 0) {
       checkForDeactivation();
     }
+    
+    queue.decPendingResource(offSwitchRequest.getNodeLabelExpression(),
+        offSwitchRequest.getCapability());
   }
   
   synchronized private void checkForDeactivation() {
@@ -404,6 +415,12 @@ public class AppSchedulingInfo {
             request.getCapability());
         newMetrics.incrPendingResources(user, request.getNumContainers(),
             request.getCapability());
+        
+        Resource delta = Resources.multiply(request.getCapability(),
+            request.getNumContainers()); 
+        // Update Queue
+        queue.decPendingResource(request.getNodeLabelExpression(), delta);
+        newQueue.incPendingResource(request.getNodeLabelExpression(), delta);
       }
     }
     oldMetrics.moveAppFrom(this);
@@ -423,6 +440,12 @@ public class AppSchedulingInfo {
       if (request != null) {
         metrics.decrPendingResources(user, request.getNumContainers(),
             request.getCapability());
+        
+        // Update Queue
+        queue.decPendingResource(
+            request.getNodeLabelExpression(),
+            Resources.multiply(request.getCapability(),
+                request.getNumContainers()));
       }
     }
     metrics.finishAppAttempt(applicationId, pending, user);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index 4663a91..02003c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -90,4 +90,24 @@ public interface Queue {
    * @return default label expression
    */
   public String getDefaultNodeLabelExpression();
+
+  /**
+   * When new outstanding resource is asked, calling this will increase pending
+   * resource in a queue.
+   * 
+   * @param nodeLabel asked by application
+   * @param resourceToInc new resource asked
+   */
+  public void incPendingResource(String nodeLabel, Resource resourceToInc);
+  
+  /**
+   * When an outstanding resource is fulfilled or canceled, calling this will
+   * decrease pending resource in a queue.
+   * 
+   * @param nodeLabel
+   *          asked by application
+   * @param resourceToDec
+   *          new resource asked
+   */
+  public void decPendingResource(String nodeLabel, Resource resourceToDec);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
index de44bbe..36ee4da 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceUsage.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.HashMap;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -75,14 +76,17 @@ public class ResourceUsage {
       };
     }
     
+    public Resource getUsed() {
+      return resArr[ResourceType.USED.idx];
+    }
+
     @Override
     public String toString() {
       StringBuilder sb = new StringBuilder();
       sb.append("{used=" + resArr[0] + "%, ");
       sb.append("pending=" + resArr[1] + "%, ");
       sb.append("am_used=" + resArr[2] + "%, ");
-      sb.append("reserved=" + resArr[3] + "%, ");
-      sb.append("headroom=" + resArr[4] + "%}");
+      sb.append("reserved=" + resArr[3] + "%}");
       return sb.toString();
     }
   }
@@ -117,6 +121,17 @@ public class ResourceUsage {
   public void setUsed(Resource res) {
     setUsed(NL, res);
   }
+  
+  public void copyAllUsed(ResourceUsage other) {
+    try {
+      writeLock.lock();
+      for (Entry<String, UsageByLabel> entry : other.usages.entrySet()) {
+        setUsed(entry.getKey(), Resources.clone(entry.getValue().getUsed()));
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
 
   public void setUsed(String label, Resource res) {
     _set(label, ResourceType.USED, res);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index 9816699..799a5c1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -87,13 +87,12 @@ public class SchedulerApplicationAttempt {
 
   private final Multiset<Priority> reReservations = HashMultiset.create();
   
-  protected final Resource currentReservation = Resource.newInstance(0, 0);
   private Resource resourceLimit = Resource.newInstance(0, 0);
-  protected Resource currentConsumption = Resource.newInstance(0, 0);
-  private Resource amResource = Resources.none();
   private boolean unmanagedAM = true;
   private boolean amRunning = false;
   private LogAggregationContext logAggregationContext;
+  
+  protected ResourceUsage attemptResourceUsage = new ResourceUsage();
 
   protected List<RMContainer> newlyAllocatedContainers = 
       new ArrayList<RMContainer>();
@@ -217,11 +216,11 @@ public class SchedulerApplicationAttempt {
   }
   
   public Resource getAMResource() {
-    return amResource;
+    return attemptResourceUsage.getAMUsed();
   }
 
   public void setAMResource(Resource amResource) {
-    this.amResource = amResource;
+    attemptResourceUsage.setAMUsed(amResource);
   }
 
   public boolean isAmRunning() {
@@ -260,7 +259,7 @@ public class SchedulerApplicationAttempt {
   @Stable
   @Private
   public synchronized Resource getCurrentReservation() {
-    return currentReservation;
+    return attemptResourceUsage.getReserved();
   }
   
   public Queue getQueue() {
@@ -311,8 +310,8 @@ public class SchedulerApplicationAttempt {
       rmContainer = 
           new RMContainerImpl(container, getApplicationAttemptId(), 
               node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
-        
-      Resources.addTo(currentReservation, container.getResource());
+      attemptResourceUsage.incReserved(node.getPartition(),
+          container.getResource());
       
       // Reset the re-reservation count
       resetReReservations(priority);
@@ -336,7 +335,7 @@ public class SchedulerApplicationAttempt {
           + " reserved container " + rmContainer + " on node " + node
           + ". This attempt currently has " + reservedContainers.size()
           + " reserved containers at priority " + priority
-          + "; currentReservation " + currentReservation.getMemory());
+          + "; currentReservation " + container.getResource());
     }
 
     return rmContainer;
@@ -402,9 +401,9 @@ public class SchedulerApplicationAttempt {
       for (Priority priority : getPriorities()) {
         Map<String, ResourceRequest> requests = getResourceRequests(priority);
         if (requests != null) {
-          LOG.debug("showRequests:" + " application=" + getApplicationId() + 
-              " headRoom=" + getHeadroom() + 
-              " currentConsumption=" + currentConsumption.getMemory());
+          LOG.debug("showRequests:" + " application=" + getApplicationId()
+              + " headRoom=" + getHeadroom() + " currentConsumption="
+              + attemptResourceUsage.getUsed().getMemory());
           for (ResourceRequest request : requests.values()) {
             LOG.debug("showRequests:" + " application=" + getApplicationId()
                 + " request=" + request);
@@ -415,7 +414,7 @@ public class SchedulerApplicationAttempt {
   }
   
   public Resource getCurrentConsumption() {
-    return currentConsumption;
+    return attemptResourceUsage.getUsed();
   }
 
   public static class ContainersAndNMTokensAllocation {
@@ -548,12 +547,17 @@ public class SchedulerApplicationAttempt {
   }
 
   public synchronized ApplicationResourceUsageReport getResourceUsageReport() {
-    AggregateAppResourceUsage resUsage = getRunningAggregateAppResourceUsage();
+    AggregateAppResourceUsage runningResourceUsage =
+        getRunningAggregateAppResourceUsage();
+    Resource usedResourceClone =
+        Resources.clone(attemptResourceUsage.getUsed());
+    Resource reservedResourceClone =
+        Resources.clone(attemptResourceUsage.getReserved());
     return ApplicationResourceUsageReport.newInstance(liveContainers.size(),
-               reservedContainers.size(), Resources.clone(currentConsumption),
-               Resources.clone(currentReservation),
-               Resources.add(currentConsumption, currentReservation),
-               resUsage.getMemorySeconds(), resUsage.getVcoreSeconds());
+        reservedContainers.size(), usedResourceClone, reservedResourceClone,
+        Resources.add(usedResourceClone, reservedResourceClone),
+        runningResourceUsage.getMemorySeconds(),
+        runningResourceUsage.getVcoreSeconds());
   }
 
   public synchronized Map<ContainerId, RMContainer> getLiveContainersMap() {
@@ -572,7 +576,7 @@ public class SchedulerApplicationAttempt {
       SchedulerApplicationAttempt appAttempt) {
     this.liveContainers = appAttempt.getLiveContainersMap();
     // this.reReservations = appAttempt.reReservations;
-    this.currentConsumption = appAttempt.getCurrentConsumption();
+    this.attemptResourceUsage.copyAllUsed(appAttempt.attemptResourceUsage);
     this.resourceLimit = appAttempt.getResourceLimit();
     // this.currentReservation = appAttempt.currentReservation;
     // this.newlyAllocatedContainers = appAttempt.newlyAllocatedContainers;
@@ -603,7 +607,8 @@ public class SchedulerApplicationAttempt {
     this.queue = newQueue;
   }
 
-  public synchronized void recoverContainer(RMContainer rmContainer) {
+  public synchronized void recoverContainer(SchedulerNode node,
+      RMContainer rmContainer) {
     // recover app scheduling info
     appSchedulingInfo.recoverContainer(rmContainer);
 
@@ -613,8 +618,9 @@ public class SchedulerApplicationAttempt {
     LOG.info("SchedulerAttempt " + getApplicationAttemptId()
       + " is recovering container " + rmContainer.getContainerId());
     liveContainers.put(rmContainer.getContainerId(), rmContainer);
-    Resources.addTo(currentConsumption, rmContainer.getContainer()
-      .getResource());
+    attemptResourceUsage.incUsed(node.getPartition(), rmContainer
+        .getContainer().getResource());
+    
     // resourceLimit: updated when LeafQueue#recoverContainer#allocateResource
     // is called.
     // newlyAllocatedContainers.add(rmContainer);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index 2901134..f03663a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
@@ -294,4 +295,17 @@ public abstract class SchedulerNode {
   public void updateLabels(Set<String> labels) {
     this.labels = labels;
   }
+  
+  /**
+   * Get partition of which the node belongs to, if node-labels of this node is
+   * empty or null, it belongs to NO_LABEL partition. And since we only support
+   * one partition for each node (YARN-2694), first label will be its partition.
+   */
+  public String getPartition() {
+    if (this.labels == null || this.labels.isEmpty()) {
+      return RMNodeLabelsManager.NO_LABEL; 
+    } else {
+      return this.labels.iterator().next();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 4e53060..3cd85ae 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -509,4 +509,28 @@ public abstract class AbstractCSQueue implements CSQueue {
     // non-empty
     return false;
   }
+  
+  @Override
+  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.incPending(nodeLabel, resourceToInc);
+    if (null != parent) {
+      parent.incPendingResource(nodeLabel, resourceToInc);
+    }
+  }
+  
+  @Override
+  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+    if (nodeLabel == null) {
+      nodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    // ResourceUsage has its own lock, no addition lock needs here.
+    queueUsage.decPending(nodeLabel, resourceToDec);
+    if (null != parent) {
+      parent.decPendingResource(nodeLabel, resourceToDec);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index fa0e280..3e5405d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -739,6 +739,15 @@ public class LeafQueue extends AbstractCSQueue {
     return labels;
   }
   
+  private boolean checkResourceRequestMatchingNodeLabel(ResourceRequest offswitchResourceRequest,
+      FiCaSchedulerNode node) {
+    String askedNodeLabel = offswitchResourceRequest.getNodeLabelExpression();
+    if (null == askedNodeLabel) {
+      askedNodeLabel = RMNodeLabelsManager.NO_LABEL;
+    }
+    return askedNodeLabel.equals(node.getPartition());
+  }
+  
   @Override
   public synchronized CSAssignment assignContainers(Resource clusterResource,
       FiCaSchedulerNode node, ResourceLimits currentResourceLimits) {
@@ -796,6 +805,14 @@ public class LeafQueue extends AbstractCSQueue {
           if (application.getTotalRequiredResources(priority) <= 0) {
             continue;
           }
+          
+          // Is the node-label-expression of this offswitch resource request
+          // matches the node's label?
+          // If not match, jump to next priority.
+          if (!checkResourceRequestMatchingNodeLabel(anyRequest, node)) {
+            continue;
+          }
+          
           if (!this.reservationsContinueLooking) {
             if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
               if (LOG.isDebugEnabled()) {
@@ -825,7 +842,7 @@ public class LeafQueue extends AbstractCSQueue {
           }
 
           // Check user limit
-          if (!assignToUser(clusterResource, application.getUser(), userLimit,
+          if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
               application, true, requestedNodeLabels)) {
             break;
           }
@@ -1076,7 +1093,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   @Private
-  protected synchronized boolean assignToUser(Resource clusterResource,
+  protected synchronized boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
       boolean checkReservations, Set<String> requestLabels) {
     User user = getUser(userName);
@@ -1094,7 +1111,8 @@ public class LeafQueue extends AbstractCSQueue {
             limit)) {
       // if enabled, check to see if could we potentially use this node instead
       // of a reserved node if the application has reserved containers
-      if (this.reservationsContinueLooking && checkReservations) {
+      if (this.reservationsContinueLooking && checkReservations
+          && label.equals(CommonNodeLabelsManager.NO_LABEL)) {
         if (Resources.lessThanOrEqual(
             resourceCalculator,
             clusterResource,
@@ -1305,7 +1323,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
 
     // Check user limit
-    if (!assignToUser(clusterResource, application.getUser(), userLimit,
+    if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
         application, false, null)) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("was going to reserve but hit user limit");
@@ -1622,7 +1640,8 @@ public class LeafQueue extends AbstractCSQueue {
               node, rmContainer);
         } else {
           removed =
-            application.containerCompleted(rmContainer, containerStatus, event);
+              application.containerCompleted(rmContainer, containerStatus,
+                  event, node.getPartition());
           node.releaseContainer(container);
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 10f5c20..e041389 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -90,7 +90,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
   }
 
   synchronized public boolean containerCompleted(RMContainer rmContainer,
-      ContainerStatus containerStatus, RMContainerEventType event) {
+      ContainerStatus containerStatus, RMContainerEventType event,
+      String partition) {
 
     // Remove from the list of containers
     if (null == liveContainers.remove(rmContainer.getContainerId())) {
@@ -122,7 +123,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    Resources.subtractFrom(currentConsumption, containerResource);
+    attemptResourceUsage.decUsed(partition, containerResource);
 
     // Clear resource utilization metrics cache.
     lastMemoryAggregateAllocationUpdateTime = -1;
@@ -156,7 +157,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
         type, node, priority, request, container);
-    Resources.addTo(currentConsumption, container.getResource());
+    attemptResourceUsage.incUsed(node.getPartition(),
+        container.getResource());
     
     // Update resource requests related to "request" and store in RMContainer 
     ((RMContainerImpl)rmContainer).setResourceRequests(resourceRequestList);
@@ -198,12 +200,13 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         resetReReservations(priority);
 
         Resource resource = reservedContainer.getContainer().getResource();
-        Resources.subtractFrom(currentReservation, resource);
+        this.attemptResourceUsage.decReserved(node.getPartition(), resource);
 
         LOG.info("Application " + getApplicationId() + " unreserved "
-            + " on node " + node + ", currently has " + reservedContainers.size()
-            + " at priority " + priority + "; currentReservation "
-            + currentReservation);
+            + " on node " + node + ", currently has "
+            + reservedContainers.size() + " at priority " + priority
+            + "; currentReservation " + this.attemptResourceUsage.getReserved()
+            + " on node-label=" + node.getPartition());
         return true;
       }
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
index 67103d1..dfde5ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java
@@ -142,7 +142,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // Update usage metrics 
     Resource containerResource = rmContainer.getContainer().getResource();
     queue.getMetrics().releaseResources(getUser(), 1, containerResource);
-    Resources.subtractFrom(currentConsumption, containerResource);
+    this.attemptResourceUsage.decUsed(containerResource);
 
     // remove from preemption map if it is completed
     preemptionMap.remove(rmContainer);
@@ -164,11 +164,12 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     resetReReservations(priority);
 
     Resource resource = reservedContainer.getContainer().getResource();
-    Resources.subtractFrom(currentReservation, resource);
+    this.attemptResourceUsage.decReserved(resource);
 
     LOG.info("Application " + getApplicationId() + " unreserved " + " on node "
-        + node + ", currently has " + reservedContainers.size() + " at priority "
-        + priority + "; currentReservation " + currentReservation);
+        + node + ", currently has " + reservedContainers.size()
+        + " at priority " + priority + "; currentReservation "
+        + this.attemptResourceUsage.getReserved());
   }
 
   @Override
@@ -339,7 +340,7 @@ public class FSAppAttempt extends SchedulerApplicationAttempt
     // Update consumption and track allocations
     List<ResourceRequest> resourceRequestList = appSchedulingInfo.allocate(
         type, node, priority, request, container);
-    Resources.addTo(currentConsumption, container.getResource());
+    this.attemptResourceUsage.incUsed(container.getResource());
 
     // Update resource requests related to "request" and store in RMContainer
     ((RMContainerImpl) rmContainer).setResourceRequests(resourceRequestList);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
index 349464e..1562bf6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
@@ -291,4 +291,12 @@ public abstract class FSQueue implements Queue, Schedulable {
     // TODO, add implementation for FS
     return null;
   }
+  
+  @Override
+  public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+  }
+  
+  @Override
+  public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index beb3ab5..b8c419c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -54,6 +54,7 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@@ -200,6 +201,14 @@ public class FifoScheduler extends
       // TODO add implementation for FIFO scheduler
       return null;
     }
+
+    @Override
+    public void incPendingResource(String nodeLabel, Resource resourceToInc) {
+    }
+
+    @Override
+    public void decPendingResource(String nodeLabel, Resource resourceToDec) {
+    }
   };
 
   public FifoScheduler() {
@@ -870,7 +879,8 @@ public class FifoScheduler extends
     }
 
     // Inform the application
-    application.containerCompleted(rmContainer, containerStatus, event);
+    application.containerCompleted(rmContainer, containerStatus, event,
+        RMNodeLabelsManager.NO_LABEL);
 
     // Inform the node
     node.releaseContainer(container);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
index 494f5a4..f62fdb3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockAM.java
@@ -164,17 +164,19 @@ public class MockAM {
   public List<ResourceRequest> createReq(String[] hosts, int memory, int priority,
       int containers, String labelExpression) throws Exception {
     List<ResourceRequest> reqs = new ArrayList<ResourceRequest>();
-    for (String host : hosts) {
-      // only add host/rack request when asked host isn't ANY
-      if (!host.equals(ResourceRequest.ANY)) {
-        ResourceRequest hostReq =
-            createResourceReq(host, memory, priority, containers,
-                labelExpression);
-        reqs.add(hostReq);
-        ResourceRequest rackReq =
-            createResourceReq("/default-rack", memory, priority, containers,
-                labelExpression);
-        reqs.add(rackReq);
+    if (hosts != null) {
+      for (String host : hosts) {
+        // only add host/rack request when asked host isn't ANY
+        if (!host.equals(ResourceRequest.ANY)) {
+          ResourceRequest hostReq =
+              createResourceReq(host, memory, priority, containers,
+                  labelExpression);
+          reqs.add(hostReq);
+          ResourceRequest rackReq =
+              createResourceReq("/default-rack", memory, priority, containers,
+                  labelExpression);
+          reqs.add(rackReq);
+        }
       }
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
index e30f441..aaa615d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
@@ -29,11 +29,13 @@ import java.io.IOException;
 import java.net.InetSocketAddress;
 import java.security.PrivilegedAction;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.CyclicBarrier;
 
@@ -111,6 +113,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaS
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
@@ -128,10 +131,13 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
-import org.junit.ComparisonFailure;
 import org.junit.Test;
 import org.mockito.Mockito;
 
+import com.google.common.collect.ImmutableMap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Sets;
+
 
 public class TestCapacityScheduler {
   private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
@@ -2557,6 +2563,165 @@ public class TestCapacityScheduler {
     Assert.fail("Shouldn't successfully allocate containers for am2, "
         + "queue-a's max capacity will be violated if container allocated");
   }
+  
+  @SuppressWarnings("unchecked")
+  private <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  @Test
+  public void testQueueHierarchyPendingResourceUpdate() throws Exception {
+    Configuration conf =
+        TestUtils.getConfigurationWithQueueLabels(new Configuration(false));
+    conf.setBoolean(YarnConfiguration.NODE_LABELS_ENABLED, true);
+    
+    final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
+    mgr.init(conf);
+    mgr.addToCluserNodeLabels(ImmutableSet.of("x", "y"));
+    mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0), toSet("x")));
+    
+    MemoryRMStateStore memStore = new MemoryRMStateStore();
+    memStore.init(conf);
+    MockRM rm = new MockRM(conf, memStore) {
+      protected RMNodeLabelsManager createNodeLabelManager() {
+        return mgr;
+      }
+    };
+    
+    rm.start();
+    MockNM nm1 = // label = x
+        new MockNM("h1:1234", 200 * GB, rm.getResourceTrackerService());
+    nm1.registerNode();
+    
+    MockNM nm2 = // label = ""
+        new MockNM("h2:1234", 200 * GB, rm.getResourceTrackerService());
+    nm2.registerNode();
+    
+    // Launch app1 in queue=a1
+    RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a1");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2);
+    
+    // Launch app2 in queue=b1  
+    RMApp app2 = rm.submitApp(8 * GB, "app", "user", null, "b1");
+    MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
+    
+    // am1 asks for 8 * 1GB container for no label
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, null);
+    
+    // am2 asks for 8 * 1GB container for no label
+    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 8 * GB, null);
+    checkPendingResource(rm, "b", 8 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 16 * GB, null);
+    
+    // am2 asks for 8 * 1GB container in another priority for no label
+    am2.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(2), "*", Resources.createResource(1 * GB), 8)),
+        null);
+    
+    checkPendingResource(rm, "a1", 8 * GB, null);
+    checkPendingResource(rm, "a", 8 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 24 * GB, null);
+    
+    // am1 asks 4 GB resource instead of 8 * GB for priority=1
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(1), "*", Resources.createResource(4 * GB), 1)),
+        null);
+    
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+    
+    // am1 asks 8 * GB resource which label=x
+    am1.allocate(Arrays.asList(ResourceRequest.newInstance(
+        Priority.newInstance(2), "*", Resources.createResource(8 * GB), 1,
+        true, "x")), null);
+    
+    checkPendingResource(rm, "a1", 4 * GB, null);
+    checkPendingResource(rm, "a", 4 * GB, null);
+    checkPendingResource(rm, "a1", 8 * GB, "x");
+    checkPendingResource(rm, "a", 8 * GB, "x");
+    checkPendingResource(rm, "b1", 16 * GB, null);
+    checkPendingResource(rm, "b", 16 * GB, null);
+    // root = a + b
+    checkPendingResource(rm, "root", 20 * GB, null);
+    checkPendingResource(rm, "root", 8 * GB, "x");
+    
+    // some containers allocated for am1, pending resource should decrease
+    ContainerId containerId =
+        ContainerId.newContainerId(am1.getApplicationAttemptId(), 2);
+    Assert.assertTrue(rm.waitForState(nm1, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    containerId = ContainerId.newContainerId(am1.getApplicationAttemptId(), 3);
+    Assert.assertTrue(rm.waitForState(nm2, containerId,
+        RMContainerState.ALLOCATED, 10 * 1000));
+    
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x");
+    // some containers could be allocated for am2 when we allocating containers
+    // for am1, just check if pending resource of b1/b/root > 0 
+    checkPendingResourceGreaterThanZero(rm, "b1", null);
+    checkPendingResourceGreaterThanZero(rm, "b", null);
+    // root = a + b
+    checkPendingResourceGreaterThanZero(rm, "root", null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+    
+    // complete am2, pending resource should be 0 now
+    AppAttemptRemovedSchedulerEvent appRemovedEvent =
+        new AppAttemptRemovedSchedulerEvent(
+          am2.getApplicationAttemptId(), RMAppAttemptState.FINISHED, false);
+    rm.getResourceScheduler().handle(appRemovedEvent);
+    
+    checkPendingResource(rm, "a1", 0 * GB, null);
+    checkPendingResource(rm, "a", 0 * GB, null);
+    checkPendingResource(rm, "a1", 0 * GB, "x");
+    checkPendingResource(rm, "a", 0 * GB, "x"); 
+    checkPendingResource(rm, "b1", 0 * GB, null);
+    checkPendingResource(rm, "b", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, null);
+    checkPendingResource(rm, "root", 0 * GB, "x");
+  }
+  
+  private void checkPendingResource(MockRM rm, String queueName, int memory,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertEquals(
+        memory,
+        queue.getQueueResourceUsage()
+            .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+            .getMemory());
+  }
+
+  private void checkPendingResourceGreaterThanZero(MockRM rm, String queueName,
+      String label) {
+    CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
+    CSQueue queue = cs.getQueue(queueName);
+    Assert.assertTrue(queue.getQueueResourceUsage()
+        .getPending(label == null ? RMNodeLabelsManager.NO_LABEL : label)
+        .getMemory() > 0);
+  }
 
   // Test verifies AM Used resource for LeafQueue when AM ResourceRequest is
   // lesser than minimumAllocation

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 71dc523..23b31fa 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -247,7 +247,8 @@ public class TestChildQueueOrder {
     // Stub an App and its containerCompleted
     FiCaSchedulerApp app_0 = getMockApplication(0,user_0);
     doReturn(true).when(app_0).containerCompleted(any(RMContainer.class),
-        any(ContainerStatus.class),any(RMContainerEventType.class));
+        any(ContainerStatus.class), any(RMContainerEventType.class),
+        any(String.class));
 
     Priority priority = TestUtils.createMockPriority(1); 
     ContainerAllocationExpirer expirer = 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
index fe61eab..03b8f5c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java
@@ -328,19 +328,6 @@ public class TestContainerAllocation {
     MockRM.launchAndRegisterAM(app1, rm1, nm1);
   }
   
-  private Configuration getConfigurationWithDefaultQueueLabels(
-      Configuration config) {
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    
-    CapacitySchedulerConfiguration conf =
-        (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
-        new CapacitySchedulerConfiguration(config);
-    conf.setDefaultNodeLabelExpression(A, "x");
-    conf.setDefaultNodeLabelExpression(B, "y");
-    return conf;
-  }
-  
   private Configuration getConfigurationWithQueueLabels(Configuration config) {
     CapacitySchedulerConfiguration conf =
         new CapacitySchedulerConfiguration(config);
@@ -406,57 +393,6 @@ public class TestContainerAllocation {
     return set;
   }
   
-  private Configuration getComplexConfigurationWithQueueLabels(
-      Configuration config) {
-    CapacitySchedulerConfiguration conf =
-        new CapacitySchedulerConfiguration(config);
-    
-    // Define top-level queues
-    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
-    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
-
-    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
-    conf.setCapacity(A, 10);
-    conf.setMaximumCapacity(A, 10);
-    conf.setAccessibleNodeLabels(A, toSet("x", "y"));
-    conf.setCapacityByLabel(A, "x", 100);
-    conf.setCapacityByLabel(A, "y", 50);
-    
-    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
-    conf.setCapacity(B, 90);
-    conf.setMaximumCapacity(B, 100);
-    conf.setAccessibleNodeLabels(B, toSet("y", "z"));
-    conf.setCapacityByLabel(B, "y", 50);
-    conf.setCapacityByLabel(B, "z", 100);
-    
-    // Define 2nd-level queues
-    final String A1 = A + ".a1";
-    conf.setQueues(A, new String[] {"a1"});
-    conf.setCapacity(A1, 100);
-    conf.setMaximumCapacity(A1, 100);
-    conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
-    conf.setDefaultNodeLabelExpression(A1, "x");
-    conf.setCapacityByLabel(A1, "x", 100);
-    conf.setCapacityByLabel(A1, "y", 100);
-    
-    conf.setQueues(B, new String[] {"b1", "b2"});
-    final String B1 = B + ".b1";
-    conf.setCapacity(B1, 50);
-    conf.setMaximumCapacity(B1, 50);
-    conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
-
-    final String B2 = B + ".b2";
-    conf.setCapacity(B2, 50);
-    conf.setMaximumCapacity(B2, 50);
-    conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
-    conf.setCapacityByLabel(B2, "y", 100);
-    conf.setCapacityByLabel(B2, "z", 100);
-
-    return conf;
-  }
-  
   @Test (timeout = 300000)
   public void testContainerAllocationWithSingleUserLimits() throws Exception {
     final RMNodeLabelsManager mgr = new NullRMNodeLabelsManager();
@@ -468,7 +404,7 @@ public class TestContainerAllocation {
         NodeId.newInstance("h2", 0), toSet("y")));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
@@ -554,7 +490,7 @@ public class TestContainerAllocation {
         RMNodeLabelsManager.EMPTY_STRING_SET));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getComplexConfigurationWithQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getComplexConfigurationWithQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;
@@ -711,7 +647,7 @@ public class TestContainerAllocation {
         NodeId.newInstance("h2", 0), toSet("y")));
 
     // inject node label manager
-    MockRM rm1 = new MockRM(getConfigurationWithDefaultQueueLabels(conf)) {
+    MockRM rm1 = new MockRM(TestUtils.getConfigurationWithDefaultQueueLabels(conf)) {
       @Override
       public RMNodeLabelsManager createNodeLabelManager() {
         return mgr;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index c5b7587..e8a8243 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -1058,19 +1058,19 @@ public class TestReservations {
 
     // set limit so subtrace reservations it can continue
     Resource limit = Resources.createResource(12 * GB, 0);
-    boolean res = a.assignToUser(clusterResource, user_0, limit, app_0,
+    boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0,
         true, null);
     assertTrue(res);
 
     // tell it not to check for reservations and should fail as already over
     // limit
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, false, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, null);
     assertFalse(res);
 
     refreshQueuesTurnOffReservationsContLook(a, csConf);
 
     // should now return false since feature off
-    res = a.assignToUser(clusterResource, user_0, limit, app_0, true, null);
+    res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, null);
     assertFalse(res);
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/586348e4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index 9e352a7..62135b9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -60,6 +60,8 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 import org.mockito.invocation.InvocationOnMock;
 import org.mockito.stubbing.Answer;
 
+import com.google.common.collect.Sets;
+
 public class TestUtils {
   private static final Log LOG = LogFactory.getLog(TestUtils.class);
 
@@ -216,4 +218,131 @@ public class TestUtils {
     when(container.getPriority()).thenReturn(priority);
     return container;
   }
+  
+  @SuppressWarnings("unchecked")
+  private static <E> Set<E> toSet(E... elements) {
+    Set<E> set = Sets.newHashSet(elements);
+    return set;
+  }
+  
+  /**
+   * Get a queue structure:
+   * <pre>
+   *             Root
+   *            /  |  \
+   *           a   b   c
+   *           |   |   |
+   *           a1  b1  c1
+   *          (x)  (y)
+   * </pre>  
+   */
+  public static Configuration getConfigurationWithQueueLabels(Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b", "c"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 15);
+    conf.setAccessibleNodeLabels(A, toSet("x"));
+    conf.setCapacityByLabel(A, "x", 100);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 20);
+    conf.setAccessibleNodeLabels(B, toSet("y"));
+    conf.setCapacityByLabel(B, "y", 100);
+    
+    final String C = CapacitySchedulerConfiguration.ROOT + ".c";
+    conf.setCapacity(C, 70);
+    conf.setMaximumCapacity(C, 70);
+    conf.setAccessibleNodeLabels(C, RMNodeLabelsManager.EMPTY_STRING_SET);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setCapacityByLabel(A1, "x", 100);
+    
+    final String B1 = B + ".b1";
+    conf.setQueues(B, new String[] {"b1"});
+    conf.setCapacity(B1, 100);
+    conf.setMaximumCapacity(B1, 100);
+    conf.setCapacityByLabel(B1, "y", 100);
+
+    final String C1 = C + ".c1";
+    conf.setQueues(C, new String[] {"c1"});
+    conf.setCapacity(C1, 100);
+    conf.setMaximumCapacity(C1, 100);
+    
+    return conf;
+  }
+  
+  public static Configuration getComplexConfigurationWithQueueLabels(
+      Configuration config) {
+    CapacitySchedulerConfiguration conf =
+        new CapacitySchedulerConfiguration(config);
+    
+    // Define top-level queues
+    conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "x", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "y", 100);
+    conf.setCapacityByLabel(CapacitySchedulerConfiguration.ROOT, "z", 100);
+
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    conf.setCapacity(A, 10);
+    conf.setMaximumCapacity(A, 10);
+    conf.setAccessibleNodeLabels(A, toSet("x", "y"));
+    conf.setCapacityByLabel(A, "x", 100);
+    conf.setCapacityByLabel(A, "y", 50);
+    
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    conf.setCapacity(B, 90);
+    conf.setMaximumCapacity(B, 100);
+    conf.setAccessibleNodeLabels(B, toSet("y", "z"));
+    conf.setCapacityByLabel(B, "y", 50);
+    conf.setCapacityByLabel(B, "z", 100);
+    
+    // Define 2nd-level queues
+    final String A1 = A + ".a1";
+    conf.setQueues(A, new String[] {"a1"});
+    conf.setCapacity(A1, 100);
+    conf.setMaximumCapacity(A1, 100);
+    conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
+    conf.setDefaultNodeLabelExpression(A1, "x");
+    conf.setCapacityByLabel(A1, "x", 100);
+    conf.setCapacityByLabel(A1, "y", 100);
+    
+    conf.setQueues(B, new String[] {"b1", "b2"});
+    final String B1 = B + ".b1";
+    conf.setCapacity(B1, 50);
+    conf.setMaximumCapacity(B1, 50);
+    conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
+
+    final String B2 = B + ".b2";
+    conf.setCapacity(B2, 50);
+    conf.setMaximumCapacity(B2, 50);
+    conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
+    conf.setCapacityByLabel(B2, "y", 100);
+    conf.setCapacityByLabel(B2, "z", 100);
+
+    return conf;
+  }
+  
+  public static Configuration getConfigurationWithDefaultQueueLabels(
+      Configuration config) {
+    final String A = CapacitySchedulerConfiguration.ROOT + ".a";
+    final String B = CapacitySchedulerConfiguration.ROOT + ".b";
+    
+    CapacitySchedulerConfiguration conf =
+        (CapacitySchedulerConfiguration) getConfigurationWithQueueLabels(config);
+        new CapacitySchedulerConfiguration(config);
+    conf.setDefaultNodeLabelExpression(A, "x");
+    conf.setDefaultNodeLabelExpression(B, "y");
+    return conf;
+  }
 }