You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by vi...@apache.org on 2014/10/16 03:33:57 UTC

[4/4] git commit: YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan. YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. C

YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating resources based on node-labels. Contributed by Wangda Tan.
YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources based on node-labels. Contributed by Wangda Tan.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2ea555a
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2ea555a
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2ea555a

Branch: refs/heads/trunk
Commit: f2ea555ac6c06a3f2f6559731f48711fff05d3f1
Parents: 466f087
Author: Vinod Kumar Vavilapalli <vi...@apache.org>
Authored: Wed Oct 15 18:33:06 2014 -0700
Committer: Vinod Kumar Vavilapalli <vi...@apache.org>
Committed: Wed Oct 15 18:33:06 2014 -0700

----------------------------------------------------------------------
 .../hadoop/yarn/sls/nodemanager/NodeInfo.java   |   5 +
 .../yarn/sls/scheduler/RMNodeWrapper.java       |   5 +
 hadoop-yarn-project/CHANGES.txt                 |   6 +
 .../dev-support/findbugs-exclude.xml            |  17 +
 .../ApplicationMasterService.java               |  42 +-
 .../server/resourcemanager/RMAppManager.java    |  32 +-
 .../yarn/server/resourcemanager/RMContext.java  |   5 +
 .../server/resourcemanager/RMContextImpl.java   |  12 +
 .../server/resourcemanager/RMServerUtils.java   |  19 +-
 .../server/resourcemanager/ResourceManager.java |  11 +-
 .../CapacitySchedulerPlanFollower.java          |  17 +-
 .../server/resourcemanager/rmapp/RMAppImpl.java |   8 +-
 .../rmapp/attempt/RMAppAttemptImpl.java         |  39 +-
 .../server/resourcemanager/rmnode/RMNode.java   |   8 +
 .../resourcemanager/rmnode/RMNodeImpl.java      |   8 +
 .../server/resourcemanager/scheduler/Queue.java |  19 +
 .../scheduler/SchedulerUtils.java               | 122 +++-
 .../scheduler/capacity/AbstractCSQueue.java     | 448 ++++++++++++++
 .../scheduler/capacity/CSQueue.java             |  61 +-
 .../scheduler/capacity/CSQueueUtils.java        |  57 +-
 .../scheduler/capacity/CapacityScheduler.java   |  48 +-
 .../CapacitySchedulerConfiguration.java         | 148 ++++-
 .../scheduler/capacity/LeafQueue.java           | 596 ++++++++++---------
 .../scheduler/capacity/ParentQueue.java         | 391 +++++-------
 .../scheduler/capacity/PlanQueue.java           |   8 +-
 .../scheduler/capacity/ReservationQueue.java    |   2 +-
 .../resourcemanager/scheduler/fair/FSQueue.java |  13 +
 .../scheduler/fifo/FifoScheduler.java           |  13 +
 .../server/resourcemanager/Application.java     |   1 +
 .../yarn/server/resourcemanager/MockAM.java     |  35 +-
 .../yarn/server/resourcemanager/MockNodes.java  |   7 +-
 .../yarn/server/resourcemanager/MockRM.java     |  37 +-
 .../server/resourcemanager/RMHATestBase.java    |   5 +-
 .../server/resourcemanager/TestAppManager.java  |   4 +-
 .../resourcemanager/TestApplicationACLs.java    |   3 +-
 .../resourcemanager/TestClientRMService.java    |  54 +-
 .../yarn/server/resourcemanager/TestRMHA.java   |   2 +-
 .../TestWorkPreservingRMRestart.java            |   2 +-
 .../reservation/ReservationSystemTestUtil.java  |  30 +
 .../rmapp/TestRMAppTransitions.java             |  11 +-
 .../attempt/TestRMAppAttemptTransitions.java    |  66 +-
 .../scheduler/TestSchedulerUtils.java           | 279 +++++++--
 .../capacity/TestApplicationLimits.java         |  11 +-
 .../scheduler/capacity/TestCSQueueUtils.java    |  28 +-
 .../capacity/TestCapacityScheduler.java         |  28 +-
 .../scheduler/capacity/TestChildQueueOrder.java |   5 +-
 .../capacity/TestContainerAllocation.java       | 460 ++++++++++++++
 .../scheduler/capacity/TestLeafQueue.java       |  37 +-
 .../scheduler/capacity/TestParentQueue.java     |   5 +-
 .../scheduler/capacity/TestQueueMappings.java   |  10 +-
 .../scheduler/capacity/TestQueueParsing.java    | 267 ++++++++-
 .../capacity/TestReservationQueue.java          |   9 +-
 .../scheduler/capacity/TestReservations.java    |  30 +-
 .../scheduler/capacity/TestUtils.java           |  31 +-
 .../scheduler/fair/FairSchedulerTestBase.java   |   2 +-
 .../scheduler/fair/TestFairScheduler.java       |   2 +-
 .../resourcemanager/webapp/TestRMWebApp.java    |  42 +-
 57 files changed, 2867 insertions(+), 796 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
index 029fa87..fdddcf4 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/nodemanager/NodeInfo.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.sls.nodemanager;
 
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
@@ -159,6 +160,10 @@ public class NodeInfo {
       return null;
     }
 
+    @Override
+    public Set<String> getNodeLabels() {
+      return null;
+    }
   }
 
   public static RMNode newNodeInfo(String rackName, String hostName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
----------------------------------------------------------------------
diff --git a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
index 7eca66f..3b185ae 100644
--- a/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
+++ b/hadoop-tools/hadoop-sls/src/main/java/org/apache/hadoop/yarn/sls/scheduler/RMNodeWrapper.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode
 
 import java.util.Collections;
 import java.util.List;
+import java.util.Set;
 
 @Private
 @Unstable
@@ -147,4 +148,8 @@ public class RMNodeWrapper implements RMNode {
     return node.getNodeManagerVersion();
   }
 
+  @Override
+  public Set<String> getNodeLabels() {
+    return null;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 326b554..97fea49 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -165,6 +165,12 @@ Release 2.6.0 - UNRELEASED
     YARN-2656. Made RM web services authentication filter support proxy user.
     (Varun Vasudev and Zhijie Shen via zjshen)
 
+    YARN-2496. Enhanced Capacity Scheduler to have basic support for allocating
+    resources based on node-labels. (Wangda Tan via vinodkv)
+
+    YARN-2500. Ehnaced ResourceManager to support schedulers allocating resources
+    based on node-labels. (Wangda Tan via vinodkv)
+
   IMPROVEMENTS
 
     YARN-2197. Add a link to YARN CHANGES.txt in the left side of doc

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
index 6e82af0..e6da24c 100644
--- a/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
+++ b/hadoop-yarn-project/hadoop-yarn/dev-support/findbugs-exclude.xml
@@ -188,6 +188,23 @@
     </Or>
     <Bug pattern="IS2_INCONSISTENT_SYNC" />
   </Match>
+  <Match>
+    <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue" />
+    <Or>
+      <Field name="absoluteCapacity" />
+      <Field name="absoluteMaxCapacity" />
+      <Field name="acls" />
+      <Field name="capacity" />
+      <Field name="maximumCapacity" />
+      <Field name="state" />
+      <Field name="labelManager" />
+      <Field name="defaultLabelExpression" />
+      <Field name="accessibleLabels" />
+      <Field name="absoluteNodeLabelCapacities" />
+      <Field name="reservationsContinueLooking" />
+    </Or>
+    <Bug pattern="IS2_INCONSISTENT_SYNC" />
+  </Match>
   <!-- Inconsistent sync warning - scheduleAsynchronously is only initialized once and never changed -->
   <Match>
     <Class name="org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler" />

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
index 707cf1b..35baa44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ApplicationMasterService.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterRespo
 import org.apache.hadoop.yarn.api.records.AMCommand;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
 import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.NMToken;
@@ -254,13 +255,13 @@ public class ApplicationMasterService extends AbstractService implements
       if (hasApplicationMasterRegistered(applicationAttemptId)) {
         String message =
             "Application Master is already registered : "
-                + applicationAttemptId.getApplicationId();
+                + appID;
         LOG.warn(message);
         RMAuditLogger.logFailure(
           this.rmContext.getRMApps()
-            .get(applicationAttemptId.getApplicationId()).getUser(),
+            .get(appID).getUser(),
           AuditConstants.REGISTER_AM, "", "ApplicationMasterService", message,
-          applicationAttemptId.getApplicationId(), applicationAttemptId);
+          appID, applicationAttemptId);
         throw new InvalidApplicationMasterRequestException(message);
       }
       
@@ -340,6 +341,7 @@ public class ApplicationMasterService extends AbstractService implements
 
     ApplicationAttemptId applicationAttemptId =
         authorizeRequest().getApplicationAttemptId();
+    ApplicationId appId = applicationAttemptId.getApplicationId();
 
     AllocateResponseLock lock = responseMap.get(applicationAttemptId);
     if (lock == null) {
@@ -351,13 +353,13 @@ public class ApplicationMasterService extends AbstractService implements
       if (!hasApplicationMasterRegistered(applicationAttemptId)) {
         String message =
             "Application Master is trying to unregister before registering for: "
-                + applicationAttemptId.getApplicationId();
+                + appId;
         LOG.error(message);
         RMAuditLogger.logFailure(
             this.rmContext.getRMApps()
-                .get(applicationAttemptId.getApplicationId()).getUser(),
+                .get(appId).getUser(),
             AuditConstants.UNREGISTER_AM, "", "ApplicationMasterService",
-            message, applicationAttemptId.getApplicationId(),
+            message, appId,
             applicationAttemptId);
         throw new ApplicationMasterNotRegisteredException(message);
       }
@@ -365,7 +367,7 @@ public class ApplicationMasterService extends AbstractService implements
       this.amLivelinessMonitor.receivedPing(applicationAttemptId);
 
       RMApp rmApp =
-          rmContext.getRMApps().get(applicationAttemptId.getApplicationId());
+          rmContext.getRMApps().get(appId);
 
       if (rmApp.isAppFinalStateStored()) {
         return FinishApplicationMasterResponse.newInstance(true);
@@ -418,6 +420,7 @@ public class ApplicationMasterService extends AbstractService implements
 
     ApplicationAttemptId appAttemptId =
         amrmTokenIdentifier.getApplicationAttemptId();
+    ApplicationId applicationId = appAttemptId.getApplicationId();
 
     this.amLivelinessMonitor.receivedPing(appAttemptId);
 
@@ -432,14 +435,14 @@ public class ApplicationMasterService extends AbstractService implements
       if (!hasApplicationMasterRegistered(appAttemptId)) {
         String message =
             "Application Master is not registered for known application: "
-                + appAttemptId.getApplicationId()
+                + applicationId
                 + ". Let AM resync.";
         LOG.info(message);
         RMAuditLogger.logFailure(
-            this.rmContext.getRMApps().get(appAttemptId.getApplicationId())
+            this.rmContext.getRMApps().get(applicationId)
                 .getUser(), AuditConstants.REGISTER_AM, "",
             "ApplicationMasterService", message,
-            appAttemptId.getApplicationId(),
+            applicationId,
             appAttemptId);
         return resync;
       }
@@ -481,11 +484,22 @@ public class ApplicationMasterService extends AbstractService implements
       List<String> blacklistRemovals =
           (blacklistRequest != null) ?
               blacklistRequest.getBlacklistRemovals() : Collections.EMPTY_LIST;
-
+      RMApp app =
+          this.rmContext.getRMApps().get(applicationId);
+      
+      // set label expression for Resource Requests
+      ApplicationSubmissionContext asc = app.getApplicationSubmissionContext();
+      for (ResourceRequest req : ask) {
+        if (null == req.getNodeLabelExpression()) {
+          req.setNodeLabelExpression(asc.getNodeLabelExpression());
+        }
+      }
+              
       // sanity check
       try {
         RMServerUtils.validateResourceRequests(ask,
-            rScheduler.getMaximumResourceCapability());
+            rScheduler.getMaximumResourceCapability(), app.getQueue(),
+            rScheduler);
       } catch (InvalidResourceRequestException e) {
         LOG.warn("Invalid resource ask by application " + appAttemptId, e);
         throw e;
@@ -498,8 +512,6 @@ public class ApplicationMasterService extends AbstractService implements
         throw e;
       }
 
-      RMApp app =
-          this.rmContext.getRMApps().get(appAttemptId.getApplicationId());
       // In the case of work-preserving AM restart, it's possible for the
       // AM to release containers from the earlier attempt.
       if (!app.getApplicationSubmissionContext()
@@ -582,7 +594,7 @@ public class ApplicationMasterService extends AbstractService implements
             .toString(), amrmToken.getPassword(), amrmToken.getService()
             .toString()));
         LOG.info("The AMRMToken has been rolled-over. Send new AMRMToken back"
-            + " to application: " + appAttemptId.getApplicationId());
+            + " to application: " + applicationId);
       }
 
       /*

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
index 1d672e5..6e1b925 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java
@@ -343,7 +343,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
       long submitTime, String user)
       throws YarnException {
     ApplicationId applicationId = submissionContext.getApplicationId();
-    validateResourceRequest(submissionContext);
+    ResourceRequest amReq = validateAndCreateResourceRequest(submissionContext);
     // Create RMApp
     RMAppImpl application =
         new RMAppImpl(applicationId, rmContext, this.conf,
@@ -351,7 +351,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
             submissionContext.getQueue(),
             submissionContext, this.scheduler, this.masterService,
             submitTime, submissionContext.getApplicationType(),
-            submissionContext.getApplicationTags());
+            submissionContext.getApplicationTags(), amReq);
 
     // Concurrent app submissions with same applicationId will fail here
     // Concurrent app submissions with different applicationIds will not
@@ -373,7 +373,7 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
     return application;
   }
 
-  private void validateResourceRequest(
+  private ResourceRequest validateAndCreateResourceRequest(
       ApplicationSubmissionContext submissionContext)
       throws InvalidResourceRequestException {
     // Validation of the ApplicationSubmissionContext needs to be completed
@@ -383,18 +383,36 @@ public class RMAppManager implements EventHandler<RMAppManagerEvent>,
 
     // Check whether AM resource requirements are within required limits
     if (!submissionContext.getUnmanagedAM()) {
-      ResourceRequest amReq = BuilderUtils.newResourceRequest(
-          RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
-          submissionContext.getResource(), 1);
+      ResourceRequest amReq;
+      if (submissionContext.getAMContainerResourceRequest() != null) {
+        amReq = submissionContext.getAMContainerResourceRequest();
+      } else {
+        amReq =
+            BuilderUtils.newResourceRequest(
+                RMAppAttemptImpl.AM_CONTAINER_PRIORITY, ResourceRequest.ANY,
+                submissionContext.getResource(), 1);
+      }
+      
+      // set label expression for AM container
+      if (null == amReq.getNodeLabelExpression()) {
+        amReq.setNodeLabelExpression(submissionContext
+            .getNodeLabelExpression());
+      }
+
       try {
         SchedulerUtils.validateResourceRequest(amReq,
-            scheduler.getMaximumResourceCapability());
+            scheduler.getMaximumResourceCapability(),
+            submissionContext.getQueue(), scheduler);
       } catch (InvalidResourceRequestException e) {
         LOG.warn("RM app submission failed in validating AM resource request"
             + " for application " + submissionContext.getApplicationId(), e);
         throw e;
       }
+      
+      return amReq;
     }
+    
+    return null;
   }
 
   private boolean isApplicationInFinalState(RMAppState rmAppState) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
index a59965f..e824634 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContext.java
@@ -27,6 +27,7 @@ import org.apache.hadoop.yarn.conf.ConfigurationProvider;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@@ -108,6 +109,10 @@ public interface RMContext {
 
   boolean isWorkPreservingRecoveryEnabled();
   
+  RMNodeLabelsManager getNodeLabelManager();
+  
+  public void setNodeLabelManager(RMNodeLabelsManager mgr);
+
   long getEpoch();
 
   ReservationSystem getReservationSystem();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 78787ee..076c3dd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter;
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
@@ -91,6 +92,7 @@ public class RMContextImpl implements RMContext {
   private RMApplicationHistoryWriter rmApplicationHistoryWriter;
   private SystemMetricsPublisher systemMetricsPublisher;
   private ConfigurationProvider configurationProvider;
+  private RMNodeLabelsManager nodeLabelManager;
   private long epoch;
   private Clock systemClock = new SystemClock();
   private long schedulerRecoveryStartTime = 0;
@@ -406,6 +408,16 @@ public class RMContextImpl implements RMContext {
     this.epoch = epoch;
   }
 
+  @Override
+  public RMNodeLabelsManager getNodeLabelManager() {
+    return nodeLabelManager;
+  }
+  
+  @Override
+  public void setNodeLabelManager(RMNodeLabelsManager mgr) {
+    nodeLabelManager = mgr;
+  }
+
   public void setSchedulerRecoveryStartAndWaitTime(long waitTime) {
     this.schedulerRecoveryStartTime = systemClock.getTime();
     this.schedulerRecoveryWaitTime = waitTime;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
index 29c5953..40d86e5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
@@ -84,9 +85,11 @@ public class RMServerUtils {
    * requested memory/vcore is non-negative and not greater than max
    */
   public static void validateResourceRequests(List<ResourceRequest> ask,
-      Resource maximumResource) throws InvalidResourceRequestException {
+      Resource maximumResource, String queueName, YarnScheduler scheduler)
+      throws InvalidResourceRequestException {
     for (ResourceRequest resReq : ask) {
-      SchedulerUtils.validateResourceRequest(resReq, maximumResource);
+      SchedulerUtils.validateResourceRequest(resReq, maximumResource,
+          queueName, scheduler);
     }
   }
 
@@ -132,17 +135,25 @@ public class RMServerUtils {
     }
   }
 
+  public static UserGroupInformation verifyAccess(
+      AccessControlList acl, String method, final Log LOG)
+      throws IOException {
+    // by default, this method will use AdminService as module name
+    return verifyAccess(acl, method, "AdminService", LOG);
+  }
+
   /**
    * Utility method to verify if the current user has access based on the
    * passed {@link AccessControlList}
    * @param acl the {@link AccessControlList} to check against
    * @param method the method name to be logged
+   * @param module, like AdminService or NodeLabelManager
    * @param LOG the logger to use
    * @return {@link UserGroupInformation} of the current user
    * @throws IOException
    */
   public static UserGroupInformation verifyAccess(
-      AccessControlList acl, String method, final Log LOG)
+      AccessControlList acl, String method, String module, final Log LOG)
       throws IOException {
     UserGroupInformation user;
     try {
@@ -159,7 +170,7 @@ public class RMServerUtils {
           " to call '" + method + "'");
 
       RMAuditLogger.logFailure(user.getShortUserName(), method,
-          acl.toString(), "AdminService",
+          acl.toString(), module,
           RMAuditLogger.AuditConstants.UNAUTHORIZED_USER);
 
       throw new AccessControlException("User " + user.getShortUserName() +

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
index ab45020..68cbc7c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java
@@ -67,6 +67,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMaste
 import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.NullRMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
@@ -320,6 +321,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
     return new AMLivelinessMonitor(this.rmDispatcher);
   }
   
+  protected RMNodeLabelsManager createNodeLabelManager() {
+    return new RMNodeLabelsManager();
+  }
+  
   protected DelegationTokenRenewer createDelegationTokenRenewer() {
     return new DelegationTokenRenewer();
   }
@@ -399,6 +404,10 @@ public class ResourceManager extends CompositeService implements Recoverable {
       AMLivelinessMonitor amFinishingMonitor = createAMLivelinessMonitor();
       addService(amFinishingMonitor);
       rmContext.setAMFinishingMonitor(amFinishingMonitor);
+      
+      RMNodeLabelsManager nlm = createNodeLabelManager();
+      addService(nlm);
+      rmContext.setNodeLabelManager(nlm);
 
       boolean isRecoveryEnabled = conf.getBoolean(
           YarnConfiguration.RECOVERY_ENABLED,
@@ -962,7 +971,7 @@ public class ResourceManager extends CompositeService implements Recoverable {
    * instance of {@link RMActiveServices} and initializes it.
    * @throws Exception
    */
-  void createAndInitActiveServices() throws Exception {
+  protected void createAndInitActiveServices() throws Exception {
     activeServices = new RMActiveServices();
     activeServices.init(conf);
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
index 0c0fbc0..126560a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.reservation;
 
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
@@ -126,14 +127,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower {
     // create the default reservation queue if it doesnt exist
     String defReservationQueue = planQueueName + PlanQueue.DEFAULT_QUEUE_SUFFIX;
     if (scheduler.getQueue(defReservationQueue) == null) {
-      ReservationQueue defQueue =
-          new ReservationQueue(scheduler, defReservationQueue, planQueue);
       try {
+        ReservationQueue defQueue =
+            new ReservationQueue(scheduler, defReservationQueue, planQueue);
         scheduler.addQueue(defQueue);
       } catch (SchedulerDynamicEditException e) {
         LOG.warn(
             "Exception while trying to create default reservation queue for plan: {}",
             planQueueName, e);
+      } catch (IOException e) {
+        LOG.warn(
+            "Exception while trying to create default reservation queue for plan: {}",
+            planQueueName, e);
       }
     }
     curReservationNames.add(defReservationQueue);
@@ -186,14 +191,18 @@ public class CapacitySchedulerPlanFollower implements PlanFollower {
       for (ReservationAllocation res : sortedAllocations) {
         String currResId = res.getReservationId().toString();
         if (curReservationNames.contains(currResId)) {
-          ReservationQueue resQueue =
-              new ReservationQueue(scheduler, currResId, planQueue);
           try {
+            ReservationQueue resQueue =
+                new ReservationQueue(scheduler, currResId, planQueue);
             scheduler.addQueue(resQueue);
           } catch (SchedulerDynamicEditException e) {
             LOG.warn(
                 "Exception while trying to activate reservation: {} for plan: {}",
                 currResId, planQueueName, e);
+          } catch (IOException e) {
+            LOG.warn(
+                "Exception while trying to activate reservation: {} for plan: {}",
+                currResId, planQueueName, e);
           }
         }
         Resource capToAssign = res.getResourcesAtTime(now);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
index c0681aa..1994b36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java
@@ -48,6 +48,7 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.NodeState;
 import org.apache.hadoop.yarn.api.records.ReservationId;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
@@ -143,6 +144,7 @@ public class RMAppImpl implements RMApp, Recoverable {
   private RMAppEvent eventCausingFinalSaving;
   private RMAppState targetedFinalState;
   private RMAppState recoveredFinalState;
+  private ResourceRequest amReq;
 
   Object transitionTodo;
 
@@ -342,7 +344,8 @@ public class RMAppImpl implements RMApp, Recoverable {
       Configuration config, String name, String user, String queue,
       ApplicationSubmissionContext submissionContext, YarnScheduler scheduler,
       ApplicationMasterService masterService, long submitTime,
-      String applicationType, Set<String> applicationTags) {
+      String applicationType, Set<String> applicationTags, 
+      ResourceRequest amReq) {
 
     this.systemClock = new SystemClock();
 
@@ -361,6 +364,7 @@ public class RMAppImpl implements RMApp, Recoverable {
     this.startTime = this.systemClock.getTime();
     this.applicationType = applicationType;
     this.applicationTags = applicationTags;
+    this.amReq = amReq;
 
     int globalMaxAppAttempts = conf.getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
         YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS);
@@ -732,7 +736,7 @@ public class RMAppImpl implements RMApp, Recoverable {
           // previously failed attempts(which should not include Preempted,
           // hardware error and NM resync) + 1) equal to the max-attempt
           // limit.
-          maxAppAttempts == (getNumFailedAppAttempts() + 1));
+          maxAppAttempts == (getNumFailedAppAttempts() + 1), amReq);
     attempts.put(appAttemptId, attempt);
     currentAttempt = attempt;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
index fbcb7d7..b5a6237 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/attempt/RMAppAttemptImpl.java
@@ -93,7 +93,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
-import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.server.webproxy.ProxyUriUtils;
 import org.apache.hadoop.yarn.state.InvalidStateTransitonException;
 import org.apache.hadoop.yarn.state.MultipleArcTransition;
@@ -177,6 +176,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private Object transitionTodo;
   
   private RMAppAttemptMetrics attemptMetrics = null;
+  private ResourceRequest amReq = null;
 
   private static final StateMachineFactory<RMAppAttemptImpl,
                                            RMAppAttemptState,
@@ -426,7 +426,7 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
       RMContext rmContext, YarnScheduler scheduler,
       ApplicationMasterService masterService,
       ApplicationSubmissionContext submissionContext,
-      Configuration conf, boolean maybeLastAttempt) {
+      Configuration conf, boolean maybeLastAttempt, ResourceRequest amReq) {
     this.conf = conf;
     this.applicationAttemptId = appAttemptId;
     this.rmContext = rmContext;
@@ -442,8 +442,11 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
     this.proxiedTrackingUrl = generateProxyUriWithScheme(null);
     this.maybeLastAttempt = maybeLastAttempt;
     this.stateMachine = stateMachineFactory.make(this);
+
     this.attemptMetrics =
         new RMAppAttemptMetrics(applicationAttemptId, rmContext);
+    
+    this.amReq = amReq;
   }
 
   @Override
@@ -885,24 +888,34 @@ public class RMAppAttemptImpl implements RMAppAttempt, Recoverable {
   private static final List<ResourceRequest> EMPTY_CONTAINER_REQUEST_LIST =
       new ArrayList<ResourceRequest>();
 
-  private static final class ScheduleTransition
+  @VisibleForTesting
+  public static final class ScheduleTransition
       implements
       MultipleArcTransition<RMAppAttemptImpl, RMAppAttemptEvent, RMAppAttemptState> {
     @Override
     public RMAppAttemptState transition(RMAppAttemptImpl appAttempt,
         RMAppAttemptEvent event) {
-      if (!appAttempt.submissionContext.getUnmanagedAM()) {
-        // Request a container for the AM.
-        ResourceRequest request =
-            BuilderUtils.newResourceRequest(
-                AM_CONTAINER_PRIORITY, ResourceRequest.ANY, appAttempt
-                    .getSubmissionContext().getResource(), 1);
-
+      ApplicationSubmissionContext subCtx = appAttempt.submissionContext;
+      if (!subCtx.getUnmanagedAM()) {
+        // Need reset #containers before create new attempt, because this request
+        // will be passed to scheduler, and scheduler will deduct the number after
+        // AM container allocated
+        
+        // Currently, following fields are all hard code,
+        // TODO: change these fields when we want to support
+        // priority/resource-name/relax-locality specification for AM containers
+        // allocation.
+        appAttempt.amReq.setNumContainers(1);
+        appAttempt.amReq.setPriority(AM_CONTAINER_PRIORITY);
+        appAttempt.amReq.setResourceName(ResourceRequest.ANY);
+        appAttempt.amReq.setRelaxLocality(true);
+        
         // SchedulerUtils.validateResourceRequests is not necessary because
         // AM resource has been checked when submission
-        Allocation amContainerAllocation = appAttempt.scheduler.allocate(
-            appAttempt.applicationAttemptId,
-            Collections.singletonList(request), EMPTY_CONTAINER_RELEASE_LIST, null, null);
+        Allocation amContainerAllocation =
+            appAttempt.scheduler.allocate(appAttempt.applicationAttemptId,
+                Collections.singletonList(appAttempt.amReq),
+                EMPTY_CONTAINER_RELEASE_LIST, null, null);
         if (amContainerAllocation != null
             && amContainerAllocation.getContainers() != null) {
           assert (amContainerAllocation.getContainers().size() == 0);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
index a423ea5..afbcbc7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNode.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.rmnode;
 
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -135,4 +136,11 @@ public interface RMNode {
    * @return containerUpdates accumulated across NM heartbeats.
    */
   public List<UpdatedContainerInfo> pullContainerUpdates();
+  
+  /**
+   * Get set of labels in this node
+   * 
+   * @return labels in this node
+   */
+  public Set<String> getNodeLabels();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index c960b50..13d60ab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -855,4 +855,12 @@ public class RMNodeImpl implements RMNode, EventHandler<RMNodeEvent> {
   public Set<ContainerId> getLaunchedContainers() {
     return this.launchedContainers;
   }
+
+  @Override
+  public Set<String> getNodeLabels() {
+    if (context.getNodeLabelManager() == null) {
+      return null;
+    }
+    return context.getNodeLabelManager().getLabelsOnNode(nodeId);
+  }
  }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index 0bc8ca1..4663a91 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
 import java.util.List;
+import java.util.Set;
 
 import org.apache.hadoop.classification.InterfaceAudience.LimitedPrivate;
 import org.apache.hadoop.classification.InterfaceStability.Evolving;
@@ -71,4 +72,22 @@ public interface Queue {
    */
   public void recoverContainer(Resource clusterResource,
       SchedulerApplicationAttempt schedulerAttempt, RMContainer rmContainer);
+  
+  /**
+   * Get labels can be accessed of this queue
+   * labels={*}, means this queue can access any label
+   * labels={ }, means this queue cannot access any label except node without label
+   * labels={a, b, c} means this queue can access a or b or c  
+   * @return labels
+   */
+  public Set<String> getAccessibleNodeLabels();
+  
+  /**
+   * Get default label expression of this queue. If label expression of
+   * ApplicationSubmissionContext and label expression of Resource Request not
+   * set, this will be used.
+   * 
+   * @return default label expression
+   */
+  public String getDefaultNodeLabelExpression();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index ac37c2f..5d00009 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -17,23 +17,29 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
 
+import java.io.IOException;
 import java.util.List;
+import java.util.Set;
 
+import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.api.records.ResourceRequest;
 import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import com.google.common.collect.Sets;
+
 /**
  * Utilities shared by schedulers. 
  */
@@ -190,7 +196,8 @@ public class SchedulerUtils {
    *         request
    */
   public static void validateResourceRequest(ResourceRequest resReq,
-      Resource maximumResource) throws InvalidResourceRequestException {
+      Resource maximumResource, String queueName, YarnScheduler scheduler)
+      throws InvalidResourceRequestException {
     if (resReq.getCapability().getMemory() < 0 ||
         resReq.getCapability().getMemory() > maximumResource.getMemory()) {
       throw new InvalidResourceRequestException("Invalid resource request"
@@ -209,5 +216,116 @@ public class SchedulerUtils {
           + resReq.getCapability().getVirtualCores()
           + ", maxVirtualCores=" + maximumResource.getVirtualCores());
     }
+    
+    // Get queue from scheduler
+    QueueInfo queueInfo = null;
+    try {
+      queueInfo = scheduler.getQueueInfo(queueName, false, false);
+    } catch (IOException e) {
+      // it is possible queue cannot get when queue mapping is set, just ignore
+      // the queueInfo here, and move forward
+    }
+
+    // check labels in the resource request.
+    String labelExp = resReq.getNodeLabelExpression();
+    
+    // if queue has default label expression, and RR doesn't have, use the
+    // default label expression of queue
+    if (labelExp == null && queueInfo != null) {
+      labelExp = queueInfo.getDefaultNodeLabelExpression();
+      resReq.setNodeLabelExpression(labelExp);
+    }
+    
+    if (labelExp != null && !labelExp.trim().isEmpty() && queueInfo != null) {
+      if (!checkQueueLabelExpression(queueInfo.getAccessibleNodeLabels(),
+          labelExp)) {
+        throw new InvalidResourceRequestException("Invalid resource request"
+            + ", queue="
+            + queueInfo.getQueueName()
+            + " doesn't have permission to access all labels "
+            + "in resource request. labelExpression of resource request="
+            + labelExp
+            + ". Queue labels="
+            + (queueInfo.getAccessibleNodeLabels() == null ? "" : StringUtils.join(queueInfo
+                .getAccessibleNodeLabels().iterator(), ',')));
+      }
+    }
+  }
+  
+  public static boolean checkQueueAccessToNode(Set<String> queueLabels,
+      Set<String> nodeLabels) {
+    // if queue's label is *, it can access any node
+    if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
+      return true;
+    }
+    // any queue can access to a node without label
+    if (nodeLabels == null || nodeLabels.isEmpty()) {
+      return true;
+    }
+    // a queue can access to a node only if it contains any label of the node
+    if (queueLabels != null
+        && Sets.intersection(queueLabels, nodeLabels).size() > 0) {
+      return true;
+    }
+    // sorry, you cannot access
+    return false;
+  }
+  
+  public static void checkIfLabelInClusterNodeLabels(RMNodeLabelsManager mgr,
+      Set<String> labels) throws IOException {
+    if (mgr == null) {
+      if (labels != null && !labels.isEmpty()) {
+        throw new IOException("NodeLabelManager is null, please check");
+      }
+      return;
+    }
+
+    if (labels != null) {
+      for (String label : labels) {
+        if (!label.equals(RMNodeLabelsManager.ANY)
+            && !mgr.containsNodeLabel(label)) {
+          throw new IOException("NodeLabelManager doesn't include label = "
+              + label + ", please check.");
+        }
+      }
+    }
+  }
+  
+  public static boolean checkNodeLabelExpression(Set<String> nodeLabels,
+      String labelExpression) {
+    // empty label expression can only allocate on node with empty labels
+    if (labelExpression == null || labelExpression.trim().isEmpty()) {
+      if (!nodeLabels.isEmpty()) {
+        return false;
+      }
+    }
+
+    if (labelExpression != null) {
+      for (String str : labelExpression.split("&&")) {
+        if (!str.trim().isEmpty()
+            && (nodeLabels == null || !nodeLabels.contains(str.trim()))) {
+          return false;
+        }
+      }
+    }
+    return true;
+  }
+
+  public static boolean checkQueueLabelExpression(Set<String> queueLabels,
+      String labelExpression) {
+    if (queueLabels != null && queueLabels.contains(RMNodeLabelsManager.ANY)) {
+      return true;
+    }
+    // if label expression is empty, we can allocate container on any node
+    if (labelExpression == null) {
+      return true;
+    }
+    for (String str : labelExpression.split("&&")) {
+      if (!str.trim().isEmpty()
+          && (queueLabels == null || !queueLabels.contains(str.trim()))) {
+        return false;
+      }
+    }
+    return true;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
new file mode 100644
index 0000000..7159e4d
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -0,0 +1,448 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.collect.Sets;
+
+public abstract class AbstractCSQueue implements CSQueue {
+  
+  CSQueue parent;
+  final String queueName;
+  
+  float capacity;
+  float maximumCapacity;
+  float absoluteCapacity;
+  float absoluteMaxCapacity;
+  float absoluteUsedCapacity = 0.0f;
+
+  float usedCapacity = 0.0f;
+  volatile int numContainers;
+  
+  final Resource minimumAllocation;
+  final Resource maximumAllocation;
+  QueueState state;
+  final QueueMetrics metrics;
+  
+  final ResourceCalculator resourceCalculator;
+  Set<String> accessibleLabels;
+  RMNodeLabelsManager labelManager;
+  String defaultLabelExpression;
+  Resource usedResources = Resources.createResource(0, 0);
+  QueueInfo queueInfo;
+  Map<String, Float> absoluteCapacityByNodeLabels;
+  Map<String, Float> capacitiyByNodeLabels;
+  Map<String, Resource> usedResourcesByNodeLabels = new HashMap<String, Resource>();
+  Map<String, Float> absoluteMaxCapacityByNodeLabels;
+  Map<String, Float> maxCapacityByNodeLabels;
+  
+  Map<QueueACL, AccessControlList> acls = 
+      new HashMap<QueueACL, AccessControlList>();
+  boolean reservationsContinueLooking;
+  
+  private final RecordFactory recordFactory = 
+      RecordFactoryProvider.getRecordFactory(null);
+  
+  public AbstractCSQueue(CapacitySchedulerContext cs, 
+      String queueName, CSQueue parent, CSQueue old) throws IOException {
+    this.minimumAllocation = cs.getMinimumResourceCapability();
+    this.maximumAllocation = cs.getMaximumResourceCapability();
+    this.labelManager = cs.getRMContext().getNodeLabelManager();
+    this.parent = parent;
+    this.queueName = queueName;
+    this.resourceCalculator = cs.getResourceCalculator();
+    this.queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
+    
+    // must be called after parent and queueName is set
+    this.metrics = old != null ? old.getMetrics() :
+        QueueMetrics.forQueue(getQueuePath(), parent,
+            cs.getConfiguration().getEnableUserMetrics(),
+            cs.getConf());
+    
+    // get labels
+    this.accessibleLabels = cs.getConfiguration().getAccessibleNodeLabels(getQueuePath());
+    this.defaultLabelExpression = cs.getConfiguration()
+        .getDefaultNodeLabelExpression(getQueuePath());
+    
+    this.queueInfo.setQueueName(queueName);
+    
+    // inherit from parent if labels not set
+    if (this.accessibleLabels == null && parent != null) {
+      this.accessibleLabels = parent.getAccessibleNodeLabels();
+      SchedulerUtils.checkIfLabelInClusterNodeLabels(labelManager,
+          this.accessibleLabels);
+    }
+    
+    // inherit from parent if labels not set
+    if (this.defaultLabelExpression == null && parent != null
+        && this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
+      this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
+    }
+    
+    // set capacity by labels
+    capacitiyByNodeLabels =
+        cs.getConfiguration().getNodeLabelCapacities(getQueuePath(), accessibleLabels,
+            labelManager);
+
+    // set maximum capacity by labels
+    maxCapacityByNodeLabels =
+        cs.getConfiguration().getMaximumNodeLabelCapacities(getQueuePath(),
+            accessibleLabels, labelManager);
+  }
+  
+  @Override
+  public synchronized float getCapacity() {
+    return capacity;
+  }
+
+  @Override
+  public synchronized float getAbsoluteCapacity() {
+    return absoluteCapacity;
+  }
+
+  @Override
+  public float getAbsoluteMaximumCapacity() {
+    return absoluteMaxCapacity;
+  }
+
+  @Override
+  public synchronized float getAbsoluteUsedCapacity() {
+    return absoluteUsedCapacity;
+  }
+
+  @Override
+  public float getMaximumCapacity() {
+    return maximumCapacity;
+  }
+
+  @Override
+  public synchronized float getUsedCapacity() {
+    return usedCapacity;
+  }
+
+  @Override
+  public synchronized Resource getUsedResources() {
+    return usedResources;
+  }
+
+  public synchronized int getNumContainers() {
+    return numContainers;
+  }
+
+  @Override
+  public synchronized QueueState getState() {
+    return state;
+  }
+  
+  @Override
+  public QueueMetrics getMetrics() {
+    return metrics;
+  }
+  
+  @Override
+  public String getQueueName() {
+    return queueName;
+  }
+  
+  @Override
+  public synchronized CSQueue getParent() {
+    return parent;
+  }
+
+  @Override
+  public synchronized void setParent(CSQueue newParentQueue) {
+    this.parent = (ParentQueue)newParentQueue;
+  }
+  
+  public Set<String> getAccessibleNodeLabels() {
+    return accessibleLabels;
+  }
+  
+  @Override
+  public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
+    synchronized (this) {
+      if (acls.get(acl).isUserAllowed(user)) {
+        return true;
+      }
+    }
+    
+    if (parent != null) {
+      return parent.hasAccess(acl, user);
+    }
+    
+    return false;
+  }
+  
+  @Override
+  public synchronized void setUsedCapacity(float usedCapacity) {
+    this.usedCapacity = usedCapacity;
+  }
+  
+  @Override
+  public synchronized void setAbsoluteUsedCapacity(float absUsedCapacity) {
+    this.absoluteUsedCapacity = absUsedCapacity;
+  }
+
+  /**
+   * Set maximum capacity - used only for testing.
+   * @param maximumCapacity new max capacity
+   */
+  synchronized void setMaxCapacity(float maximumCapacity) {
+    // Sanity check
+    CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+    float absMaxCapacity =
+        CSQueueUtils.computeAbsoluteMaximumCapacity(maximumCapacity, parent);
+    CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+        absMaxCapacity);
+    
+    this.maximumCapacity = maximumCapacity;
+    this.absoluteMaxCapacity = absMaxCapacity;
+  }
+
+  @Override
+  public float getAbsActualCapacity() {
+    // for now, simply return actual capacity = guaranteed capacity for parent
+    // queue
+    return absoluteCapacity;
+  }
+
+  @Override
+  public String getDefaultNodeLabelExpression() {
+    return defaultLabelExpression;
+  }
+  
+  synchronized void setupQueueConfigs(Resource clusterResource, float capacity,
+      float absoluteCapacity, float maximumCapacity, float absoluteMaxCapacity,
+      QueueState state, Map<QueueACL, AccessControlList> acls,
+      Set<String> labels, String defaultLabelExpression,
+      Map<String, Float> nodeLabelCapacities,
+      Map<String, Float> maximumNodeLabelCapacities,
+      boolean reservationContinueLooking)
+      throws IOException {
+    // Sanity check
+    CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
+    CSQueueUtils.checkAbsoluteCapacity(getQueueName(), absoluteCapacity,
+        absoluteMaxCapacity);
+
+    this.capacity = capacity;
+    this.absoluteCapacity = absoluteCapacity;
+
+    this.maximumCapacity = maximumCapacity;
+    this.absoluteMaxCapacity = absoluteMaxCapacity;
+
+    this.state = state;
+
+    this.acls = acls;
+    
+    // set labels
+    this.accessibleLabels = labels;
+    
+    // set label expression
+    this.defaultLabelExpression = defaultLabelExpression;
+    
+    // copy node label capacity
+    this.capacitiyByNodeLabels = new HashMap<String, Float>(nodeLabelCapacities);
+    this.maxCapacityByNodeLabels =
+        new HashMap<String, Float>(maximumNodeLabelCapacities);
+    
+    this.queueInfo.setAccessibleNodeLabels(this.accessibleLabels);
+    this.queueInfo.setCapacity(this.capacity);
+    this.queueInfo.setMaximumCapacity(this.maximumCapacity);
+    this.queueInfo.setQueueState(this.state);
+    this.queueInfo.setDefaultNodeLabelExpression(this.defaultLabelExpression);
+
+    // Update metrics
+    CSQueueUtils.updateQueueStatistics(
+        resourceCalculator, this, parent, clusterResource, minimumAllocation);
+    
+    // Check if labels of this queue is a subset of parent queue, only do this
+    // when we not root
+    if (parent != null && parent.getParent() != null) {
+      if (parent.getAccessibleNodeLabels() != null
+          && !parent.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
+        // if parent isn't "*", child shouldn't be "*" too
+        if (this.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
+          throw new IOException("Parent's accessible queue is not ANY(*), "
+              + "but child's accessible queue is *");
+        } else {
+          Set<String> diff =
+              Sets.difference(this.getAccessibleNodeLabels(),
+                  parent.getAccessibleNodeLabels());
+          if (!diff.isEmpty()) {
+            throw new IOException("Some labels of child queue is not a subset "
+                + "of parent queue, these labels=["
+                + StringUtils.join(diff, ",") + "]");
+          }
+        }
+      }
+    }
+    
+    // calculate absolute capacity by each node label
+    this.absoluteCapacityByNodeLabels =
+        CSQueueUtils.computeAbsoluteCapacityByNodeLabels(
+            this.capacitiyByNodeLabels, parent);
+    
+    // calculate maximum capacity by each node label
+    this.absoluteMaxCapacityByNodeLabels =
+        CSQueueUtils.computeAbsoluteMaxCapacityByNodeLabels(
+            maximumNodeLabelCapacities, parent);
+    
+    // check absoluteMaximumNodeLabelCapacities is valid
+    CSQueueUtils.checkAbsoluteCapacitiesByLabel(getQueueName(),
+        absoluteCapacityByNodeLabels, absoluteCapacityByNodeLabels);
+    
+    this.reservationsContinueLooking = reservationContinueLooking;
+  }
+  
+  @Private
+  public Resource getMaximumAllocation() {
+    return maximumAllocation;
+  }
+  
+  @Private
+  public Resource getMinimumAllocation() {
+    return minimumAllocation;
+  }
+  
+  synchronized void allocateResource(Resource clusterResource, 
+      Resource resource, Set<String> nodeLabels) {
+    Resources.addTo(usedResources, resource);
+    
+    // Update usedResources by labels
+    if (nodeLabels == null || nodeLabels.isEmpty()) {
+      if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+        usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
+            Resources.createResource(0));
+      }
+      Resources.addTo(usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL),
+          resource);
+    } else {
+      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
+        if (!usedResourcesByNodeLabels.containsKey(label)) {
+          usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+        }
+        Resources.addTo(usedResourcesByNodeLabels.get(label), resource);
+      }
+    }
+
+    ++numContainers;
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+        clusterResource, minimumAllocation);
+  }
+  
+  protected synchronized void releaseResource(Resource clusterResource,
+      Resource resource, Set<String> nodeLabels) {
+    // Update queue metrics
+    Resources.subtractFrom(usedResources, resource);
+
+    // Update usedResources by labels
+    if (null == nodeLabels || nodeLabels.isEmpty()) {
+      if (!usedResourcesByNodeLabels.containsKey(RMNodeLabelsManager.NO_LABEL)) {
+        usedResourcesByNodeLabels.put(RMNodeLabelsManager.NO_LABEL,
+            Resources.createResource(0));
+      }
+      Resources.subtractFrom(
+          usedResourcesByNodeLabels.get(RMNodeLabelsManager.NO_LABEL), resource);
+    } else {
+      for (String label : Sets.intersection(accessibleLabels, nodeLabels)) {
+        if (!usedResourcesByNodeLabels.containsKey(label)) {
+          usedResourcesByNodeLabels.put(label, Resources.createResource(0));
+        }
+        Resources.subtractFrom(usedResourcesByNodeLabels.get(label), resource);
+      }
+    }
+
+    CSQueueUtils.updateQueueStatistics(resourceCalculator, this, getParent(),
+        clusterResource, minimumAllocation);
+    --numContainers;
+  }
+  
+  @Private
+  public float getCapacityByNodeLabel(String label) {
+    if (null == parent) {
+      return 1f;
+    }
+    
+    if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      return getCapacity();
+    }
+    
+    if (!capacitiyByNodeLabels.containsKey(label)) {
+      return 0;
+    } else {
+      return capacitiyByNodeLabels.get(label);
+    }
+  }
+  
+  @Private
+  public float getAbsoluteCapacityByNodeLabel(String label) {
+    if (null == parent) {
+      return 1; 
+    }
+    
+    if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      return getAbsoluteCapacity();
+    }
+    
+    if (!absoluteMaxCapacityByNodeLabels.containsKey(label)) {
+      return 0;
+    } else {
+      return absoluteMaxCapacityByNodeLabels.get(label);
+    }
+  }
+  
+  @Private
+  public float getAbsoluteMaximumCapacityByNodeLabel(String label) {
+    if (StringUtils.equals(label, RMNodeLabelsManager.NO_LABEL)) {
+      return getAbsoluteMaximumCapacity();
+    }
+    
+    return getAbsoluteCapacityByNodeLabel(label);
+  }
+  
+  @Private
+  public boolean getReservationContinueLooking() {
+    return reservationsContinueLooking;
+  }
+  
+  @Private
+  public Map<QueueACL, AccessControlList> getACLs() {
+    return acls;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index db893dc..6438d6c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -72,9 +72,18 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
   
   /**
    * Get the configured <em>capacity</em> of the queue.
-   * @return queue capacity
+   * @return configured queue capacity
    */
   public float getCapacity();
+  
+  /**
+   * Get actual <em>capacity</em> of the queue, this may be different from
+   * configured capacity when mis-config take place, like add labels to the
+   * cluster
+   * 
+   * @return actual queue capacity
+   */
+  public float getAbsActualCapacity();
 
   /**
    * Get capacity of the parent of the queue as a function of the 
@@ -106,28 +115,31 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
   public float getAbsoluteUsedCapacity();
 
   /**
-   * Get the current used capacity of the queue
-   * and it's children (if any).
-   * @return queue used capacity
-   */
-  public float getUsedCapacity();
-  
-  /**
    * Set used capacity of the queue.
-   * @param usedCapacity used capacity of the queue
+   * @param usedCapacity
+   *          used capacity of the queue
    */
   public void setUsedCapacity(float usedCapacity);
-  
+
   /**
    * Set absolute used capacity of the queue.
-   * @param absUsedCapacity absolute used capacity of the queue
+   * @param absUsedCapacity
+   *          absolute used capacity of the queue
    */
   public void setAbsoluteUsedCapacity(float absUsedCapacity);
 
   /**
-   * Get the currently utilized resources in the cluster 
-   * by the queue and children (if any).
-   * @return used resources by the queue and it's children 
+   * Get the current used capacity of nodes without label(s) of the queue
+   * and it's children (if any).
+   * @return queue used capacity
+   */
+  public float getUsedCapacity();
+
+  /**
+   * Get the currently utilized resources which allocated at nodes without any
+   * labels in the cluster by the queue and children (if any).
+   * 
+   * @return used resources by the queue and it's children
    */
   public Resource getUsedResources();
   
@@ -259,4 +271,25 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
    */
   public void attachContainer(Resource clusterResource,
                FiCaSchedulerApp application, RMContainer container);
+  
+  /**
+   * Get absolute capacity by label of this queue can use 
+   * @param nodeLabel
+   * @return absolute capacity by label of this queue can use
+   */
+  public float getAbsoluteCapacityByNodeLabel(String nodeLabel);
+  
+  /**
+   * Get absolute max capacity by label of this queue can use 
+   * @param nodeLabel
+   * @return absolute capacity by label of this queue can use
+   */
+  public float getAbsoluteMaximumCapacityByNodeLabel(String nodeLabel);
+
+  /**
+   * Get capacity by node label
+   * @param nodeLabel
+   * @return capacity by node label
+   */
+  public float getCapacityByNodeLabel(String nodeLabel);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 737062b..0a2fa3a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -17,9 +17,12 @@
 */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Map.Entry;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -40,7 +43,7 @@ class CSQueueUtils {
     }
     }
 
-  public static void checkAbsoluteCapacities(String queueName,
+  public static void checkAbsoluteCapacity(String queueName,
       float absCapacity, float absMaxCapacity) {
     if (absMaxCapacity < (absCapacity - EPSILON)) {
       throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
@@ -49,6 +52,23 @@ class CSQueueUtils {
           + ")");
   }
   }
+  
+  public static void checkAbsoluteCapacitiesByLabel(String queueName,
+          Map<String, Float> absCapacities,
+          Map<String, Float> absMaximumCapacities) {
+    for (Entry<String, Float> entry : absCapacities.entrySet()) {
+      String label = entry.getKey();
+      float absCapacity = entry.getValue();
+      float absMaxCapacity = absMaximumCapacities.get(label);
+      if (absMaxCapacity < (absCapacity - EPSILON)) {
+        throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
+            + "Queue '" + queueName + "' has " + "an absolute capacity ("
+            + absCapacity + ") greater than "
+            + "its absolute maximumCapacity (" + absMaxCapacity + ") of label="
+            + label);
+      }
+    }
+  }
 
   public static float computeAbsoluteMaximumCapacity(
       float maximumCapacity, CSQueue parent) {
@@ -56,6 +76,39 @@ class CSQueueUtils {
         (parent == null) ? 1.0f : parent.getAbsoluteMaximumCapacity();
     return (parentAbsMaxCapacity * maximumCapacity);
   }
+  
+  public static Map<String, Float> computeAbsoluteCapacityByNodeLabels(
+      Map<String, Float> nodeLabelToCapacities, CSQueue parent) {
+    if (parent == null) {
+      return nodeLabelToCapacities;
+    }
+    
+    Map<String, Float> absoluteCapacityByNodeLabels =
+        new HashMap<String, Float>();
+    for (Entry<String, Float> entry : nodeLabelToCapacities.entrySet()) {
+      String label = entry.getKey();
+      float capacity = entry.getValue();
+      absoluteCapacityByNodeLabels.put(label,
+          capacity * parent.getAbsoluteCapacityByNodeLabel(label));
+    }
+    return absoluteCapacityByNodeLabels;
+  }
+  
+  public static Map<String, Float> computeAbsoluteMaxCapacityByNodeLabels(
+      Map<String, Float> maximumNodeLabelToCapacities, CSQueue parent) {
+    if (parent == null) {
+      return maximumNodeLabelToCapacities;
+    }
+    Map<String, Float> absoluteMaxCapacityByNodeLabels =
+        new HashMap<String, Float>();
+    for (Entry<String, Float> entry : maximumNodeLabelToCapacities.entrySet()) {
+      String label = entry.getKey();
+      float maxCapacity = entry.getValue();
+      absoluteMaxCapacityByNodeLabels.put(label,
+          maxCapacity * parent.getAbsoluteMaximumCapacityByNodeLabel(label));
+    }
+    return absoluteMaxCapacityByNodeLabels;
+  }
 
   public static int computeMaxActiveApplications(
       ResourceCalculator calculator,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2ea555a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ed5518c..9332228 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -20,7 +20,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.io.InputStream;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -53,8 +61,13 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.*;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppRejectedEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
@@ -191,6 +204,7 @@ public class CapacityScheduler extends
 
   private boolean scheduleAsynchronously;
   private AsyncScheduleThread asyncSchedulerThread;
+  private RMNodeLabelsManager labelManager;
   
   /**
    * EXPERT
@@ -275,6 +289,8 @@ public class CapacityScheduler extends
     this.applications =
         new ConcurrentHashMap<ApplicationId,
             SchedulerApplication<FiCaSchedulerApp>>();
+    this.labelManager = rmContext.getNodeLabelManager();
+
     initializeQueues(this.conf);
 
     scheduleAsynchronously = this.conf.getScheduleAynschronously();
@@ -446,7 +462,7 @@ public class CapacityScheduler extends
     root = 
         parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT, 
             queues, queues, noop);
-
+    labelManager.reinitializeQueueLabels(getQueueToLabels());
     LOG.info("Initialized root queue " + root);
     initializeQueueMappings();
   }
@@ -469,10 +485,19 @@ public class CapacityScheduler extends
     // Re-configure queues
     root.reinitialize(newRoot, clusterResource);
     initializeQueueMappings();
-    
+
     // Re-calculate headroom for active applications
     root.updateClusterResource(clusterResource);
 
+    labelManager.reinitializeQueueLabels(getQueueToLabels());
+  }
+  
+  private Map<String, Set<String>> getQueueToLabels() {
+    Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
+    for (CSQueue queue : queues.values()) {
+      queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
+    }
+    return queueToLabels;
   }
 
   /**
@@ -515,7 +540,7 @@ public class CapacityScheduler extends
   
   @Lock(CapacityScheduler.class)
   static CSQueue parseQueue(
-      CapacitySchedulerContext csContext, 
+      CapacitySchedulerContext csContext,
       CapacitySchedulerConfiguration conf, 
       CSQueue parent, String queueName, Map<String, CSQueue> queues,
       Map<String, CSQueue> oldQueues, 
@@ -1094,11 +1119,18 @@ public class CapacityScheduler extends
   }
 
   private synchronized void addNode(RMNode nodeManager) {
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.activateNode(nodeManager.getNodeID(),
+          nodeManager.getTotalCapability());
+    }
+    
     this.nodes.put(nodeManager.getNodeID(), new FiCaSchedulerNode(nodeManager,
         usePortForNodeName));
     Resources.addTo(clusterResource, nodeManager.getTotalCapability());
     root.updateClusterResource(clusterResource);
     int numNodes = numNodeManagers.incrementAndGet();
+    
     LOG.info("Added node " + nodeManager.getNodeAddress() + 
         " clusterResource: " + clusterResource);
 
@@ -1108,6 +1140,11 @@ public class CapacityScheduler extends
   }
 
   private synchronized void removeNode(RMNode nodeInfo) {
+    // update this node to node label manager
+    if (labelManager != null) {
+      labelManager.deactivateNode(nodeInfo.getNodeID());
+    }
+    
     FiCaSchedulerNode node = nodes.get(nodeInfo.getNodeID());
     if (node == null) {
       return;
@@ -1141,6 +1178,7 @@ public class CapacityScheduler extends
     }
 
     this.nodes.remove(nodeInfo.getNodeID());
+
     LOG.info("Removed node " + nodeInfo.getNodeAddress() + 
         " clusterResource: " + clusterResource);
   }