You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/12/13 15:32:27 UTC

[hadoop] branch trunk updated: YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke

This is an automated email from the ASF dual-hosted git repository.

snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 898055e  YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke
898055e is described below

commit 898055e204acfe54ab5a8cffb8f9769aab6fe7a7
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Dec 13 16:12:55 2021 +0100

    YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke
---
 .../monitor/capacity/FifoCandidatesSelector.java   |    4 +-
 .../capacity/FifoIntraQueuePreemptionPlugin.java   |    4 +-
 .../capacity/IntraQueueCandidatesSelector.java     |   13 +-
 .../monitor/capacity/TempQueuePerPartition.java    |   10 +-
 .../placement/CSMappingPlacementRule.java          |    4 +-
 .../placement/QueuePlacementRuleUtils.java         |    6 +-
 .../MappingRuleValidationContextImpl.java          |    6 +-
 .../capacity/AbstractAutoCreatedLeafQueue.java     |    2 +-
 .../{LeafQueue.java => AbstractLeafQueue.java}     |  722 +++---
 .../scheduler/capacity/AutoCreatedLeafQueue.java   |    6 +-
 .../capacity/CSMaxRunningAppsEnforcer.java         |    8 +-
 .../capacity/CapacityHeadroomProvider.java         |    8 +-
 .../scheduler/capacity/CapacityScheduler.java      |   44 +-
 .../capacity/CapacitySchedulerConfigValidator.java |    4 +-
 .../capacity/CapacitySchedulerQueueManager.java    |   14 +-
 .../scheduler/capacity/LeafQueue.java              | 2436 +-------------------
 .../scheduler/capacity/ManagedParentQueue.java     |    6 +-
 .../scheduler/capacity/ParentQueue.java            |    8 +-
 .../scheduler/capacity/ReservationQueue.java       |    5 +
 .../scheduler/capacity/UsersManager.java           |    4 +-
 .../GuaranteedOrZeroCapacityOverTimePolicy.java    |   50 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java    |   10 +-
 .../webapp/dao/CapacitySchedulerInfo.java          |    8 +-
 .../webapp/dao/CapacitySchedulerLeafQueueInfo.java |    4 +-
 .../dao/helper/CapacitySchedulerInfoHelper.java    |    4 +-
 .../TestWorkPreservingRMRestart.java               |    5 +-
 .../capacity/TestCSMaxRunningAppsEnforcer.java     |    2 +-
 .../TestCapacitySchedulerNewQueueAutoCreation.java |   15 +-
 28 files changed, 436 insertions(+), 2976 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index d9e9091..440066c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -86,7 +86,7 @@ public class FifoCandidatesSelector
       }
 
       // compute resToObtainByPartition considered inter-queue preemption
-      LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+      AbstractLeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
           RMNodeLabelsManager.NO_LABEL).leafQueue;
 
       Map<String, Resource> resToObtainByPartition =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index ea17fed..188e619 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueue
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
@@ -577,7 +577,7 @@ public class FifoIntraQueuePreemptionPlugin
   }
 
   private Resource calculateUsedAMResourcesPerQueue(String partition,
-      LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
+      AbstractLeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
     Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
     Resource amUsed = Resources.createResource(0, 0);
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index cea1bca..f0bd03b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -146,7 +146,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
 
       // 4. Iterate from most under-served queue in order.
       for (String queueName : queueNames) {
-        LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+        AbstractLeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
             RMNodeLabelsManager.NO_LABEL).leafQueue;
 
         // skip if not a leafqueue
@@ -181,7 +181,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         leafQueue.getReadLock().lock();
         try {
           for (FiCaSchedulerApp app : apps) {
-            preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
+            preemptFromLeastStarvedApp(app, selectedCandidates,
                 curCandidates, clusterResource, totalPreemptedResourceAllowed,
                 resToObtainByPartition, rollingResourceUsagePerUser);
           }
@@ -195,7 +195,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
   }
 
   private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
-      String partition, LeafQueue leafQueue,
+      String partition, AbstractLeafQueue leafQueue,
       Map<String, Resource> rollingResourceUsagePerUser) {
     for (String user : leafQueue.getAllUsers()) {
       // Initialize used resource of a given user for rolling computation.
@@ -206,8 +206,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
     }
   }
 
-  private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
-      FiCaSchedulerApp app,
+  private void preemptFromLeastStarvedApp(FiCaSchedulerApp app,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed,
@@ -293,7 +292,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
       for (String queueName : queueNames) {
         TempQueuePerPartition tq = context.getQueueByPartition(queueName,
             partition);
-        LeafQueue leafQueue = tq.leafQueue;
+        AbstractLeafQueue leafQueue = tq.leafQueue;
 
         // skip if its parent queue
         if (null == leafQueue) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 57dc639..958c08e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -26,7 +26,7 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -56,7 +56,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
 
   final ArrayList<TempQueuePerPartition> children;
   private Collection<TempAppPerPartition> apps;
-  LeafQueue leafQueue;
+  AbstractLeafQueue leafQueue;
   ParentQueue parentQueue;
   boolean preemptionDisabled;
 
@@ -81,8 +81,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     super(queueName, current, Resource.newInstance(0, 0), reserved,
         Resource.newInstance(0, 0));
 
-    if (queue instanceof LeafQueue) {
-      LeafQueue l = (LeafQueue) queue;
+    if (queue instanceof AbstractLeafQueue) {
+      AbstractLeafQueue l = (AbstractLeafQueue) queue;
       pending = l.getTotalPendingResourcesConsideringUserLimit(
           totalPartitionResource, partition, false);
       pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit(
@@ -113,7 +113,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     this.effMaxRes = effMaxRes;
   }
 
-  public void setLeafQueue(LeafQueue l) {
+  public void setLeafQueue(AbstractLeafQueue l) {
     assert children.size() == 0;
     this.leafQueue = l;
   }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
index d9c7e6f..cefed1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -277,7 +277,7 @@ public class CSMappingPlacementRule extends PlacementRule {
     }
 
     CSQueue queue = queueManager.getQueueByFullName(normalizedName);
-    if (queue != null && !(queue instanceof LeafQueue)) {
+    if (queue != null && !(queue instanceof AbstractLeafQueue)) {
       throw new YarnException("Mapping rule returned a non-leaf queue '" +
           normalizedName + "', cannot place application in it.");
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
index 76e3e27..f6381bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
@@ -21,13 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
 
 import java.io.IOException;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
-
 /**
  * Utility class for Capacity Scheduler queue PlacementRules.
  */
@@ -83,7 +81,7 @@ public final class QueuePlacementRuleUtils {
   public static QueueMapping validateAndGetQueueMapping(
       CapacitySchedulerQueueManager queueManager, CSQueue queue,
       QueueMapping mapping) throws IOException {
-    if (!(queue instanceof LeafQueue)) {
+    if (!(queue instanceof AbstractLeafQueue)) {
       throw new IOException(
           "mapping contains invalid or non-leaf queue : " +
           mapping.getFullPath());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
index 4218b6f..cceb7e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.util.Sets;
 import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
@@ -91,7 +91,7 @@ public class MappingRuleValidationContextImpl
           "' under it.");
     case QUEUE_EXISTS:
       CSQueue queue = queueManager.getQueue(normalizedPath);
-      if (!(queue instanceof LeafQueue)) {
+      if (!(queue instanceof AbstractLeafQueue)) {
         throw new YarnException("Target queue '" + path.getFullPath() +
             "' but it's not a leaf queue.");
       }
@@ -157,7 +157,7 @@ public class MappingRuleValidationContextImpl
     //if the static part of our queue exists, and it's not a leaf queue,
     //we cannot do any deeper validation
     if (queue != null) {
-      if (queue instanceof LeafQueue) {
+      if (queue instanceof AbstractLeafQueue) {
         throw new YarnException("Queue path '" + path +"' is invalid " +
             "because '" + normalizedStaticPart + "' is a leaf queue, " +
             "which can have no other queues under it.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
index b9c2ec6..36d2aef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
@@ -34,7 +34,7 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
  * Abstract class for dynamic auto created queues managed by an implementation
  * of AbstractManagedParentQueue
  */
-public class AbstractAutoCreatedLeafQueue extends LeafQueue {
+public class AbstractAutoCreatedLeafQueue extends AbstractLeafQueue {
 
   protected AbstractManagedParentQueue parent;
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
similarity index 81%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index 4592f2a..9991140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
@@ -78,18 +77,16 @@ import org.apache.hadoop.classification.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-@Private
-@Unstable
-public class LeafQueue extends AbstractCSQueue {
+public class AbstractLeafQueue extends AbstractCSQueue {
   private static final Logger LOG =
-      LoggerFactory.getLogger(LeafQueue.class);
+      LoggerFactory.getLogger(AbstractLeafQueue.class);
 
   private float absoluteUsedCapacity = 0.0f;
 
   // TODO the max applications should consider label
   protected int maxApplications;
   protected volatile int maxApplicationsPerUser;
-  
+
   private float maxAMResourcePerQueuePercent;
 
   private volatile int nodeLocalityDelay;
@@ -105,8 +102,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile float minimumAllocationFactor;
 
-  private final RecordFactory recordFactory = 
-    RecordFactoryProvider.getRecordFactory(null);
+  private final RecordFactory recordFactory =
+      RecordFactoryProvider.getRecordFactory(null);
 
   private final UsersManager usersManager;
 
@@ -142,21 +139,18 @@ public class LeafQueue extends AbstractCSQueue {
   private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
 
   @SuppressWarnings({ "unchecked", "rawtypes" })
-  public LeafQueue(CapacitySchedulerContext cs,
-      String queueName, CSQueue parent, CSQueue old) throws IOException {
+  public AbstractLeafQueue(CapacitySchedulerContext cs, String queueName,
+      CSQueue parent, CSQueue old) {
     this(cs, cs.getConfiguration(), queueName, parent, old, false);
   }
 
-  public LeafQueue(CapacitySchedulerContext cs,
-      CapacitySchedulerConfiguration configuration,
-      String queueName, CSQueue parent, CSQueue old) throws IOException {
+  public AbstractLeafQueue(CapacitySchedulerContext cs, CapacitySchedulerConfiguration configuration,
+      String queueName, CSQueue parent, CSQueue old) {
     this(cs, configuration, queueName, parent, old, false);
   }
 
-  public LeafQueue(CapacitySchedulerContext cs,
-      CapacitySchedulerConfiguration configuration,
-      String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
-      IOException {
+  public AbstractLeafQueue(CapacitySchedulerContext cs, CapacitySchedulerConfiguration configuration,
+      String queueName, CSQueue parent, CSQueue old, boolean isDynamic) {
     super(cs, configuration, queueName, parent, old);
     setDynamicQueue(isDynamic);
 
@@ -165,17 +159,11 @@ public class LeafQueue extends AbstractCSQueue {
 
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
-
-    LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
-
-    setupQueueConfigs(cs.getClusterResource(), configuration);
-
   }
 
   @SuppressWarnings("checkstyle:nowhitespaceafter")
-  protected void setupQueueConfigs(Resource clusterResource,
-      CapacitySchedulerConfiguration conf) throws
-      IOException {
+  protected void setupQueueConfigs(
+      Resource clusterResource, CapacitySchedulerConfiguration conf) throws IOException {
     writeLock.lock();
     try {
       CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
@@ -183,8 +171,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       this.lastClusterResource = clusterResource;
 
-      this.cachedResourceLimitsForHeadroom = new ResourceLimits(
-          clusterResource);
+      this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
 
       // Initialize headroom info, also used for calculating application
       // master resource limits.  Since this happens during queue initialization
@@ -193,15 +180,13 @@ public class LeafQueue extends AbstractCSQueue {
       // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
       setQueueResourceLimitsInfo(clusterResource);
 
-      setOrderingPolicy(
-          conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
+      setOrderingPolicy(conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
 
       usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
       usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
 
       maxAMResourcePerQueuePercent =
-          conf.getMaximumApplicationMasterResourcePerQueuePercent(
-              getQueuePath());
+          conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
 
       maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
       if (maxApplications < 0) {
@@ -212,33 +197,27 @@ public class LeafQueue extends AbstractCSQueue {
         }
       }
 
-      priorityAcls = conf.getPriorityAcls(getQueuePath(),
-          csContext.getMaxClusterLevelAppPriority());
+      priorityAcls =
+          conf.getPriorityAcls(getQueuePath(), csContext.getMaxClusterLevelAppPriority());
 
       Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
       if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
           this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
-        throw new IOException(
-            "Invalid default label expression of " + " queue=" + getQueuePath()
-                + " doesn't have permission to access all labels "
-                + "in default label expression. labelExpression of resource request="
-                + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
-                getAccessibleNodeLabels() == null ?
-                    "" :
-                    StringUtils
-                        .join(getAccessibleNodeLabels().iterator(), ',')));
+        throw new IOException("Invalid default label expression of " + " queue=" + getQueuePath()
+            + " doesn't have permission to access all labels "
+            + "in default label expression. labelExpression of resource request="
+            + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
+            getAccessibleNodeLabels() == null ? "" :
+                StringUtils.join(getAccessibleNodeLabels().iterator(), ',')));
       }
 
       nodeLocalityDelay = schedConf.getNodeLocalityDelay();
-      rackLocalityAdditionalDelay = schedConf
-          .getRackLocalityAdditionalDelay();
-      rackLocalityFullReset = schedConf
-          .getRackLocalityFullReset();
+      rackLocalityAdditionalDelay = schedConf.getRackLocalityAdditionalDelay();
+      rackLocalityFullReset = schedConf.getRackLocalityFullReset();
 
       // re-init this since max allocation could have changed
       this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
-          Resources.subtract(
-              queueAllocationSettings.getMaximumAllocation(),
+          Resources.subtract(queueAllocationSettings.getMaximumAllocation(),
               queueAllocationSettings.getMinimumAllocation()),
           queueAllocationSettings.getMaximumAllocation());
 
@@ -254,8 +233,8 @@ public class LeafQueue extends AbstractCSQueue {
         }
       }
 
-      defaultAppPriorityPerQueue = Priority.newInstance(
-          conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
+      defaultAppPriorityPerQueue =
+          Priority.newInstance(conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
 
       // Validate leaf queue's user's weights.
       float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
@@ -331,7 +310,7 @@ public class LeafQueue extends AbstractCSQueue {
   public float getMinimumAllocationFactor() {
     return minimumAllocationFactor;
   }
-  
+
   /**
    * Used only by tests.
    */
@@ -365,7 +344,7 @@ public class LeafQueue extends AbstractCSQueue {
   public List<CSQueue> getChildQueues() {
     return null;
   }
-  
+
   /**
    * Set user limit.
    * @param userLimit new user limit
@@ -390,8 +369,7 @@ public class LeafQueue extends AbstractCSQueue {
   public int getNumApplications() {
     readLock.lock();
     try {
-      return getNumPendingApplications() + getNumActiveApplications() +
-          getNumNonRunnableApps();
+      return getNumPendingApplications() + getNumActiveApplications() + getNumNonRunnableApps();
     } finally {
       readLock.unlock();
     }
@@ -549,14 +527,13 @@ public class LeafQueue extends AbstractCSQueue {
       }
 
       // Sanity check
-      if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
-          .getQueuePath().equals(getQueuePath())) {
-        throw new IOException(
-            "Trying to reinitialize " + getQueuePath() + " from "
-                + newlyParsedQueue.getQueuePath());
+      if (!(newlyParsedQueue instanceof AbstractLeafQueue) || !newlyParsedQueue.getQueuePath()
+          .equals(getQueuePath())) {
+        throw new IOException("Trying to reinitialize " + getQueuePath() + " from "
+            + newlyParsedQueue.getQueuePath());
       }
 
-      LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
+      AbstractLeafQueue newlyParsedLeafQueue = (AbstractLeafQueue) newlyParsedQueue;
 
       // don't allow the maximum allocation to be decreased in size
       // since we have already told running AM's the size
@@ -677,12 +654,12 @@ public class LeafQueue extends AbstractCSQueue {
     try {
       parent.validateSubmitApplication(applicationId, userName, queue);
     } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application to parent-queue: " + 
+      LOG.info("Failed to submit application to parent-queue: " +
           parent.getQueuePath(), ace);
       throw ace;
     }
   }
-  
+
   public Resource getAMResourceLimit() {
     return usageTracker.getQueueUsage().getAMLimit();
   }
@@ -775,8 +752,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   }
 
-  public Resource calculateAndGetAMResourceLimitPerPartition(
-      String nodePartition) {
+  public Resource calculateAndGetAMResourceLimitPerPartition(String nodePartition) {
     writeLock.lock();
     try {
       /*
@@ -793,13 +769,12 @@ public class LeafQueue extends AbstractCSQueue {
       // For non-labeled partition, we need to consider the current queue
       // usage limit.
       if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-        synchronized (queueResourceLimitsInfo){
+        synchronized (queueResourceLimitsInfo) {
           queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
         }
       }
 
-      float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
-          nodePartition);
+      float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(nodePartition);
 
       // Current usable resource for this queue and partition is the max of
       // queueCurrentLimit and queuePartitionResource.
@@ -807,13 +782,13 @@ public class LeafQueue extends AbstractCSQueue {
       // guarantee, use the guarantee as the queuePartitionUsableResource
       // because nothing less than the queue's guarantee should be used when
       // calculating the AM limit.
-      Resource queuePartitionUsableResource = (Resources.fitsIn(
-          resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
+      Resource queuePartitionUsableResource =
+          (Resources.fitsIn(resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
               queueCurrentLimit : queuePartitionResource;
 
-      Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
-          resourceCalculator, queuePartitionUsableResource, amResourcePercent,
-          queueAllocationSettings.getMinimumAllocation());
+      Resource amResouceLimit =
+          Resources.multiplyAndNormalizeUp(resourceCalculator, queuePartitionUsableResource,
+              amResourcePercent, queueAllocationSettings.getMinimumAllocation());
 
       usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
       usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
@@ -832,8 +807,7 @@ public class LeafQueue extends AbstractCSQueue {
     writeLock.lock();
     try {
       // limit of allowed resource usage for application masters
-      Map<String, Resource> userAmPartitionLimit =
-          new HashMap<String, Resource>();
+      Map<String, Resource> userAmPartitionLimit = new HashMap<String, Resource>();
 
       // AM Resource Limit for accessible labels can be pre-calculated.
       // This will help in updating AMResourceLimit for all labels when queue
@@ -842,10 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
         calculateAndGetAMResourceLimitPerPartition(nodePartition);
       }
 
-      for (Iterator<FiCaSchedulerApp> fsApp =
-           getPendingAppsOrderingPolicy()
-               .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
-           fsApp.hasNext(); ) {
+      for (Iterator<FiCaSchedulerApp> fsApp = getPendingAppsOrderingPolicy().getAssignmentIterator(
+          IteratorSelector.EMPTY_ITERATOR_SELECTOR); fsApp.hasNext(); ) {
         FiCaSchedulerApp application = fsApp.next();
         ApplicationId applicationId = application.getApplicationId();
 
@@ -859,8 +831,7 @@ public class LeafQueue extends AbstractCSQueue {
           amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
         }
         // Check am resource limit.
-        Resource amIfStarted = Resources.add(
-            application.getAMResource(partitionName),
+        Resource amIfStarted = Resources.add(application.getAMResource(partitionName),
             usageTracker.getQueueUsage().getAMUsed(partitionName));
 
         if (LOG.isDebugEnabled()) {
@@ -873,18 +844,19 @@ public class LeafQueue extends AbstractCSQueue {
         }
 
         if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
-          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
-              resourceCalculator, lastClusterResource,
-              usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
+          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(resourceCalculator,
+              lastClusterResource, usageTracker.getQueueUsage().getAMUsed(partitionName),
+              Resources.none()))) {
             LOG.warn("maximum-am-resource-percent is insufficient to start a"
                 + " single application in queue, it is likely set too low."
-                + " skipping enforcement to allow at least one application"
-                + " to start");
-          } else{
-            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+                + " skipping enforcement to allow at least one application" + " to start");
+          } else {
+            application.updateAMContainerDiagnostics(
+                SchedulerApplicationAttempt.AMState.INACTIVATED,
                 CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
-            LOG.debug("Not activating application {} as  amIfStarted: {}"
-                + " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
+            LOG.debug(
+                "Not activating application {} as  amIfStarted: {}" + " exceeds amLimit: {}",
+                applicationId, amIfStarted, amLimit);
             continue;
           }
         }
@@ -895,25 +867,23 @@ public class LeafQueue extends AbstractCSQueue {
 
         // Verify whether we already calculated user-am-limit for this label.
         if (userAMLimit == null) {
-          userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
-              application.getUser());
+          userAMLimit = getUserAMResourceLimitPerPartition(partitionName, application.getUser());
           userAmPartitionLimit.put(partitionName, userAMLimit);
         }
 
-        Resource userAmIfStarted = Resources.add(
-            application.getAMResource(partitionName),
+        Resource userAmIfStarted = Resources.add(application.getAMResource(partitionName),
             user.getConsumedAMResources(partitionName));
 
         if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
-          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
-              resourceCalculator, lastClusterResource,
-              usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
+          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(resourceCalculator,
+              lastClusterResource, usageTracker.getQueueUsage().getAMUsed(partitionName),
+              Resources.none()))) {
             LOG.warn("maximum-am-resource-percent is insufficient to start a"
                 + " single application in queue for user, it is likely set too"
-                + " low. skipping enforcement to allow at least one application"
-                + " to start");
-          } else{
-            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+                + " low. skipping enforcement to allow at least one application" + " to start");
+          } else {
+            application.updateAMContainerDiagnostics(
+                AMState.INACTIVATED,
                 CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
             LOG.debug("Not activating application {} for user: {} as"
                 + " userAmIfStarted: {} exceeds userAmLimit: {}",
@@ -923,17 +893,17 @@ public class LeafQueue extends AbstractCSQueue {
         }
         user.activateApplication();
         orderingPolicy.addSchedulableEntity(application);
-        application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
+        application.updateAMContainerDiagnostics(AMState.ACTIVATED,
+            null);
 
-        usageTracker.getQueueUsage().incAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        user.getResourceUsage().incAMUsed(partitionName,
-            application.getAMResource(partitionName));
+        usageTracker.getQueueUsage()
+            .incAMUsed(partitionName, application.getAMResource(partitionName));
+        user.getResourceUsage().incAMUsed(partitionName, application.getAMResource(partitionName));
         user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
         usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(),
             application.getAMResource(partitionName));
-        usageTracker.getMetrics().setAMResouceLimitForUser(partitionName,
-            application.getUser(), userAMLimit);
+        usageTracker.getMetrics()
+            .setAMResouceLimitForUser(partitionName, application.getUser(), userAMLimit);
         fsApp.remove();
         LOG.info("Application " + applicationId + " from user: " + application
             .getUser() + " activated in queue: " + getQueuePath());
@@ -943,12 +913,10 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  private void addApplicationAttempt(FiCaSchedulerApp application,
-      User user) {
+  private void addApplicationAttempt(FiCaSchedulerApp application, User user) {
     writeLock.lock();
     try {
-      applicationAttemptMap.put(application.getApplicationAttemptId(),
-          application);
+      applicationAttemptMap.put(application.getApplicationAttemptId(), application);
 
       if (application.isRunnable()) {
         runnableApps.add(application);
@@ -966,8 +934,8 @@ public class LeafQueue extends AbstractCSQueue {
       getPendingAppsOrderingPolicy().addSchedulableEntity(application);
 
       // Activate applications
-      if (Resources.greaterThan(resourceCalculator, lastClusterResource,
-          lastClusterResource, Resources.none())) {
+      if (Resources.greaterThan(resourceCalculator, lastClusterResource, lastClusterResource,
+          Resources.none())) {
         activateApplications();
       } else {
         application.updateAMContainerDiagnostics(AMState.INACTIVATED,
@@ -1010,8 +978,7 @@ public class LeafQueue extends AbstractCSQueue {
     parent.finishApplicationAttempt(application, queue);
   }
 
-  private void removeApplicationAttempt(
-      FiCaSchedulerApp application, String userName) {
+  private void removeApplicationAttempt(FiCaSchedulerApp application, String userName) {
 
     writeLock.lock();
     try {
@@ -1032,11 +999,10 @@ public class LeafQueue extends AbstractCSQueue {
       boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
       if (!wasActive) {
         pendingOrderingPolicy.removeSchedulableEntity(application);
-      } else{
-        usageTracker.getQueueUsage().decAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        user.getResourceUsage().decAMUsed(partitionName,
-            application.getAMResource(partitionName));
+      } else {
+        usageTracker.getQueueUsage()
+            .decAMUsed(partitionName, application.getAMResource(partitionName));
+        user.getResourceUsage().decAMUsed(partitionName, application.getAMResource(partitionName));
         usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(),
             application.getAMResource(partitionName));
       }
@@ -1062,8 +1028,7 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  private FiCaSchedulerApp getApplication(
-      ApplicationAttemptId applicationAttemptId) {
+  private FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
     return applicationAttemptMap.get(applicationAttemptId);
   }
 
@@ -1084,8 +1049,8 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   private CSAssignment allocateFromReservedContainer(Resource clusterResource,
-      CandidateNodeSet<FiCaSchedulerNode> candidates,
-      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
 
     // Irrespective of Single / Multi Node Placement, the allocate from
     // Reserved Container has to happen only for the single node which
@@ -1097,15 +1062,14 @@ public class LeafQueue extends AbstractCSQueue {
     if (node != null) {
       RMContainer reservedContainer = node.getReservedContainer();
       if (reservedContainer != null) {
-        FiCaSchedulerApp application = getApplication(
-            reservedContainer.getApplicationAttemptId());
+        FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId());
 
         if (null != application) {
-          ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-              node, SystemClock.getInstance().getTime(), application);
-          CSAssignment assignment = application.assignContainers(
-              clusterResource, candidates, currentResourceLimits,
-              schedulingMode, reservedContainer);
+          ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
+              SystemClock.getInstance().getTime(), application);
+          CSAssignment assignment =
+              application.assignContainers(clusterResource, candidates, currentResourceLimits,
+                  schedulingMode, reservedContainer);
           return assignment;
         }
       }
@@ -1114,8 +1078,7 @@ public class LeafQueue extends AbstractCSQueue {
     return null;
   }
 
-  private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
-      String partition,
+  private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(String partition,
       SchedulingMode schedulingMode) {
     synchronized (userLimitsCache) {
       long latestVersion = usersManager.getLatestVersionOfUsersState();
@@ -1125,20 +1088,19 @@ public class LeafQueue extends AbstractCSQueue {
         this.currentUserLimitCacheVersion = latestVersion;
         userLimitsCache.clear();
 
-        Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
-            uLCByPartition = new HashMap<>();
+        Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> uLCByPartition =
+            new HashMap<>();
         userLimitsCache.put(partition, uLCByPartition);
 
-        ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
-            new ConcurrentHashMap<>();
+        ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode = new ConcurrentHashMap<>();
         uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
 
         return uLCBySchedulingMode;
       }
 
       // User limits cache does not need invalidating
-      Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
-          uLCByPartition = userLimitsCache.get(partition);
+      Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> uLCByPartition =
+          userLimitsCache.get(partition);
       if (uLCByPartition == null) {
         uLCByPartition = new HashMap<>();
         userLimitsCache.put(partition, uLCByPartition);
@@ -1157,8 +1119,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
-      CandidateNodeSet<FiCaSchedulerNode> candidates,
-      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
+      CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits currentResourceLimits,
+      SchedulingMode schedulingMode) {
     updateCurrentResourceLimits(currentResourceLimits, clusterResource);
     FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
 
@@ -1170,8 +1132,9 @@ public class LeafQueue extends AbstractCSQueue {
     setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
 
     // Check for reserved resources, try to allocate reserved container first.
-    CSAssignment assignment = allocateFromReservedContainer(clusterResource,
-        candidates, currentResourceLimits, schedulingMode);
+    CSAssignment assignment =
+        allocateFromReservedContainer(clusterResource, candidates, currentResourceLimits,
+            schedulingMode);
     if (null != assignment) {
       return assignment;
     }
@@ -1206,33 +1169,29 @@ public class LeafQueue extends AbstractCSQueue {
     boolean needAssignToQueueCheck = true;
     IteratorSelector sel = new IteratorSelector();
     sel.setPartition(candidates.getPartition());
-    for (Iterator<FiCaSchedulerApp> assignmentIterator =
-         orderingPolicy.getAssignmentIterator(sel);
+    for (Iterator<FiCaSchedulerApp> assignmentIterator = orderingPolicy.getAssignmentIterator(sel);
          assignmentIterator.hasNext(); ) {
       FiCaSchedulerApp application = assignmentIterator.next();
 
-      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-          node, SystemClock.getInstance().getTime(), application);
+      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
+          SystemClock.getInstance().getTime(), application);
 
       // Check queue max-capacity limit
       Resource appReserved = application.getCurrentReservation();
       if (needAssignToQueueCheck) {
-        if (!super.canAssignToThisQueue(clusterResource,
-            candidates.getPartition(), currentResourceLimits, appReserved,
-            schedulingMode)) {
-          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-              activitiesManager, node, application, application.getPriority(),
+        if (!super.canAssignToThisQueue(clusterResource, candidates.getPartition(),
+            currentResourceLimits, appReserved, schedulingMode)) {
+          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(activitiesManager, node,
+              application, application.getPriority(),
               ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              parent.getQueuePath(), getQueuePath(),
-              ActivityState.REJECTED,
+          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+              getQueuePath(), ActivityState.REJECTED,
               ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
           return CSAssignment.NULL_ASSIGNMENT;
         }
         // If there was no reservation and canAssignToThisQueue returned
         // true, there is no reason to check further.
-        if (!this.reservationsContinueLooking
-            || appReserved.equals(Resources.none())) {
+        if (!this.reservationsContinueLooking || appReserved.equals(Resources.none())) {
           needAssignToQueueCheck = false;
         }
       }
@@ -1242,13 +1201,12 @@ public class LeafQueue extends AbstractCSQueue {
       if (cul != null) {
         cachedUserLimit = cul.userLimit;
       }
-      Resource userLimit = computeUserLimitAndSetHeadroom(application,
-          clusterResource, candidates.getPartition(), schedulingMode,
-          cachedUserLimit);
+      Resource userLimit =
+          computeUserLimitAndSetHeadroom(application, clusterResource, candidates.getPartition(),
+              schedulingMode, cachedUserLimit);
       if (cul == null) {
         cul = new CachedUserLimit(userLimit);
-        CachedUserLimit retVal =
-            userLimits.putIfAbsent(application.getUser(), cul);
+        CachedUserLimit retVal = userLimits.putIfAbsent(application.getUser(), cul);
         if (retVal != null) {
           // another thread updated the user limit cache before us
           cul = retVal;
@@ -1260,9 +1218,9 @@ public class LeafQueue extends AbstractCSQueue {
       if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
         userAssignable = false;
       } else {
-        userAssignable = canAssignToUser(clusterResource, application.getUser(),
-            userLimit, application, candidates.getPartition(),
-            currentResourceLimits);
+        userAssignable =
+            canAssignToUser(clusterResource, application.getUser(), userLimit, application,
+                candidates.getPartition(), currentResourceLimits);
         if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
           cul.canAssign = false;
           cul.reservation = appReserved;
@@ -1271,59 +1229,54 @@ public class LeafQueue extends AbstractCSQueue {
       if (!userAssignable) {
         application.updateAMContainerDiagnostics(AMState.ACTIVATED,
             "User capacity has reached its maximum limit.");
-        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-            activitiesManager, node, application, application.getPriority(),
+        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(activitiesManager, node,
+            application, application.getPriority(),
             ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
         continue;
       }
 
       // Try to schedule
-      assignment = application.assignContainers(clusterResource,
-          candidates, currentResourceLimits, schedulingMode, null);
+      assignment = application.assignContainers(clusterResource, candidates, currentResourceLimits,
+          schedulingMode, null);
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug("post-assignContainers for application " + application
-            .getApplicationId());
+        LOG.debug(
+            "post-assignContainers for application " + application.getApplicationId());
         application.showRequests();
       }
 
       // Did we schedule or reserve a container?
       Resource assigned = assignment.getResource();
 
-      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
-          Resources.none())) {
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(),
-            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+            getQueuePath(), ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
         return assignment;
-      } else if (assignment.getSkippedType()
-          == CSAssignment.SkippedType.OTHER) {
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+      } else if (assignment.getSkippedType() == CSAssignment.SkippedType.OTHER) {
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(activitiesManager,
+            application.getApplicationId(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
         application.updateNodeInfoForAMDiagnostics(node);
-      } else if (assignment.getSkippedType()
-          == CSAssignment.SkippedType.QUEUE_LIMIT) {
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
-            () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
-                + " from " + application.getApplicationId());
+      } else if (assignment.getSkippedType() == CSAssignment.SkippedType.QUEUE_LIMIT) {
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+            getQueuePath(), ActivityState.REJECTED,
+            () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM + " from "
+                + application.getApplicationId());
         return assignment;
-      } else{
+      } else {
         // If we don't allocate anything, and it is not skipped by application,
         // we will return to respect FIFO of applications
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
+        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+            getQueuePath(), ActivityState.SKIPPED,
             ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(activitiesManager,
+            application.getApplicationId(), ActivityState.SKIPPED,
+            ActivityDiagnosticConstant.EMPTY);
         return CSAssignment.NULL_ASSIGNMENT;
       }
     }
-    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-        parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
-        ActivityDiagnosticConstant.EMPTY);
+    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+        getQueuePath(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
 
     return CSAssignment.NULL_ASSIGNMENT;
   }
@@ -1340,14 +1293,13 @@ public class LeafQueue extends AbstractCSQueue {
     if (allocation.getAllocateFromReservedContainer() == null) {
       readLock.lock();
       try {
-        FiCaSchedulerApp app =
-            schedulerContainer.getSchedulerApplicationAttempt();
+        FiCaSchedulerApp app = schedulerContainer.getSchedulerApplicationAttempt();
         String username = app.getUser();
         String p = schedulerContainer.getNodePartition();
 
         // check user-limit
-        Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
-            allocation.getSchedulingMode(), null);
+        Resource userLimit =
+            computeUserLimitAndSetHeadroom(app, cluster, p, allocation.getSchedulingMode(), null);
 
         // Deduct resources that we can release
         User user = getUser(username);
@@ -1356,13 +1308,11 @@ public class LeafQueue extends AbstractCSQueue {
           return false;
         }
         Resource usedResource = Resources.clone(user.getUsed(p));
-        Resources.subtractFrom(usedResource,
-            request.getTotalReleasedResource());
+        Resources.subtractFrom(usedResource, request.getTotalReleasedResource());
 
-        if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
-            userLimit)) {
-          LOG.debug("Used resource={} exceeded user-limit={}",
-              usedResource, userLimit);
+        if (Resources.greaterThan(resourceCalculator, cluster, usedResource, userLimit)) {
+          LOG.debug("Used resource={} exceeded user-limit={}", usedResource,
+              userLimit);
           return false;
         }
       } finally {
@@ -1377,7 +1327,7 @@ public class LeafQueue extends AbstractCSQueue {
       SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
     RMContainer rmContainer = schedulerContainer.getRmContainer();
 
-    LeafQueue targetLeafQueue =
+    AbstractLeafQueue targetLeafQueue =
         schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
 
     if (targetLeafQueue == this) {
@@ -1385,42 +1335,34 @@ public class LeafQueue extends AbstractCSQueue {
       if (rmContainer.getState() == RMContainerState.RESERVED) {
         // For other reserved containers
         // This is a reservation exchange, complete previous reserved container
-        completedContainer(clusterResource,
-            schedulerContainer.getSchedulerApplicationAttempt(),
-            schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
-                .createAbnormalContainerStatus(rmContainer.getContainerId(),
-                    SchedulerUtils.UNRESERVED_CONTAINER),
-            RMContainerEventType.RELEASED, null, false);
+        completedContainer(clusterResource, schedulerContainer.getSchedulerApplicationAttempt(),
+            schedulerContainer.getSchedulerNode(), rmContainer,
+            SchedulerUtils.createAbnormalContainerStatus(rmContainer.getContainerId(),
+                SchedulerUtils.UNRESERVED_CONTAINER), RMContainerEventType.RELEASED, null, false);
       }
-    } else{
+    } else {
       // When trying to preempt containers from different queue -- this
       // is for lazy preemption feature (kill preemption candidate in scheduling
       // cycle).
       targetLeafQueue.completedContainer(clusterResource,
           schedulerContainer.getSchedulerApplicationAttempt(),
-          schedulerContainer.getSchedulerNode(),
-          schedulerContainer.getRmContainer(), SchedulerUtils
-              .createPreemptedContainerStatus(rmContainer.getContainerId(),
-                  SchedulerUtils.PREEMPTED_CONTAINER),
-          RMContainerEventType.KILL, null, false);
+          schedulerContainer.getSchedulerNode(), schedulerContainer.getRmContainer(),
+          SchedulerUtils.createPreemptedContainerStatus(rmContainer.getContainerId(),
+              SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, null, false);
     }
   }
 
   private void releaseContainers(Resource clusterResource,
       ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
-    for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
-        .getContainersToRelease()) {
+    for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request.getContainersToRelease()) {
       internalReleaseContainer(clusterResource, c);
     }
 
     // Handle container reservation looking, or lazy preemption case:
-    if (null != request.getContainersToAllocate() && !request
-        .getContainersToAllocate().isEmpty()) {
-      for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
-          .getContainersToAllocate()) {
+    if (null != request.getContainersToAllocate() && !request.getContainersToAllocate().isEmpty()) {
+      for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request.getContainersToAllocate()) {
         if (null != context.getToRelease()) {
-          for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
-              .getToRelease()) {
+          for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context.getToRelease()) {
             internalReleaseContainer(clusterResource, c);
           }
         }
@@ -1438,10 +1380,10 @@ public class LeafQueue extends AbstractCSQueue {
     writeLock.lock();
     try {
       if (request.anythingAllocatedOrReserved()) {
-        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
-            allocation = request.getFirstAllocatedOrReservedContainer();
-        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
-            schedulerContainer = allocation.getAllocatedOrReservedContainer();
+        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
+            request.getFirstAllocatedOrReservedContainer();
+        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
+            allocation.getAllocatedOrReservedContainer();
 
         // Do not modify queue when allocation from reserved container
         if (allocation.getAllocateFromReservedContainer() == null) {
@@ -1450,19 +1392,16 @@ public class LeafQueue extends AbstractCSQueue {
           applyToParentQueue = true;
           // Book-keeping
           // Note: Update headroom to account for current allocation too...
-          allocateResource(cluster,
-              schedulerContainer.getSchedulerApplicationAttempt(),
-              allocation.getAllocatedOrReservedResource(),
-              schedulerContainer.getNodePartition(),
+          allocateResource(cluster, schedulerContainer.getSchedulerApplicationAttempt(),
+              allocation.getAllocatedOrReservedResource(), schedulerContainer.getNodePartition(),
               schedulerContainer.getRmContainer());
-          orderingPolicy.containerAllocated(
-              schedulerContainer.getSchedulerApplicationAttempt(),
+          orderingPolicy.containerAllocated(schedulerContainer.getSchedulerApplicationAttempt(),
               schedulerContainer.getRmContainer());
         }
 
         // Update reserved resource
-        if (Resources.greaterThan(resourceCalculator, cluster,
-            request.getTotalReservedResource(), Resources.none())) {
+        if (Resources.greaterThan(resourceCalculator, cluster, request.getTotalReservedResource(),
+            Resources.none())) {
           incReservedResource(schedulerContainer.getNodePartition(),
               request.getTotalReservedResource());
         }
@@ -1476,7 +1415,6 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-
   protected Resource getHeadroom(User user, Resource queueCurrentLimit,
       Resource clusterResource, FiCaSchedulerApp application) {
     return getHeadroom(user, queueCurrentLimit, clusterResource, application,
@@ -1487,29 +1425,28 @@ public class LeafQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerApp application,
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-        getResourceLimitForActiveUsers(application.getUser(), clusterResource,
-            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-        partition);
+        getResourceLimitForActiveUsers(application.getUser(), clusterResource, partition,
+            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
   }
 
   private Resource getHeadroom(User user,
       Resource currentPartitionResourceLimit, Resource clusterResource,
       Resource userLimitResource, String partition) {
-    /** 
+    /**
      * Headroom is:
      *    min(
      *        min(userLimit, queueMaxCap) - userConsumed,
      *        queueMaxCap - queueUsedResources
      *       )
-     * 
-     * ( which can be expressed as, 
-     *  min (userLimit - userConsumed, queuMaxCap - userConsumed, 
+     *
+     * ( which can be expressed as,
+     *  min (userLimit - userConsumed, queuMaxCap - userConsumed,
      *    queueMaxCap - queueUsedResources)
      *  )
      *
      * given that queueUsedResources >= userConsumed, this simplifies to
      *
-     * >> min (userlimit - userConsumed,   queueMaxCap - queueUsedResources) << 
+     * >> min (userlimit - userConsumed,   queueMaxCap - queueUsedResources) <<
      *
      * sum of queue max capacities of multiple queue's will be greater than the
      * actual capacity of a given partition, hence we need to ensure that the
@@ -1518,45 +1455,39 @@ public class LeafQueue extends AbstractCSQueue {
      * headroom = min (unused resourcelimit of a label, calculated headroom )
      */
     currentPartitionResourceLimit =
-        partition.equals(RMNodeLabelsManager.NO_LABEL)
-            ? currentPartitionResourceLimit
-            : getQueueMaxResource(partition);
+        partition.equals(RMNodeLabelsManager.NO_LABEL) ? currentPartitionResourceLimit :
+            getQueueMaxResource(partition);
 
     Resource headroom = Resources.componentwiseMin(
-        Resources.subtractNonNegative(userLimitResource,
-            user.getUsed(partition)),
+        Resources.subtractNonNegative(userLimitResource, user.getUsed(partition)),
         Resources.subtractNonNegative(currentPartitionResourceLimit,
             usageTracker.getQueueUsage().getUsed(partition)));
     // Normalize it before return
-    headroom =
-        Resources.roundDown(resourceCalculator, headroom,
-            queueAllocationSettings.getMinimumAllocation());
+    headroom = Resources.roundDown(resourceCalculator, headroom,
+        queueAllocationSettings.getMinimumAllocation());
 
     //headroom = min (unused resourcelimit of a label, calculated headroom )
-    Resource clusterPartitionResource =
-        labelManager.getResourceByLabel(partition, clusterResource);
-    Resource clusterFreePartitionResource =
-        Resources.subtract(clusterPartitionResource,
-            csContext.getClusterResourceUsage().getUsed(partition));
-    headroom = Resources.min(resourceCalculator, clusterPartitionResource,
-        clusterFreePartitionResource, headroom);
+    Resource clusterPartitionResource = labelManager.getResourceByLabel(partition, clusterResource);
+    Resource clusterFreePartitionResource = Resources.subtract(clusterPartitionResource,
+        csContext.getClusterResourceUsage().getUsed(partition));
+    headroom =
+        Resources.min(resourceCalculator, clusterPartitionResource, clusterFreePartitionResource,
+            headroom);
     return headroom;
   }
-  
-  private void setQueueResourceLimitsInfo(
-      Resource clusterResource) {
+
+  private void setQueueResourceLimitsInfo(Resource clusterResource) {
     synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
-          .getLimit());
+      queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom.getLimit());
       queueResourceLimitsInfo.setClusterResource(clusterResource);
     }
   }
 
   // It doesn't necessarily to hold application's lock here.
-  @Lock({LeafQueue.class})
+  @Lock({AbstractLeafQueue.class})
   Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode, Resource userLimit) {
+      Resource clusterResource, String nodePartition, SchedulingMode schedulingMode,
+      Resource userLimit) {
     String user = application.getUser();
     User queueUser = getUser(user);
     if (queueUser == null) {
@@ -1567,34 +1498,33 @@ public class LeafQueue extends AbstractCSQueue {
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
     if (userLimit == null) {
-      userLimit = getResourceLimitForActiveUsers(application.getUser(),
-          clusterResource, nodePartition, schedulingMode);
+      userLimit =
+          getResourceLimitForActiveUsers(application.getUser(), clusterResource, nodePartition,
+              schedulingMode);
     }
     setQueueResourceLimitsInfo(clusterResource);
 
-    Resource headroom =
-        usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
-        getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
-            clusterResource, userLimit, nodePartition);
-    
+    Resource headroom = usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
+        getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+            userLimit, nodePartition);
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
-          + userLimit + " queueMaxAvailRes="
-          + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
-          + queueUser.getUsed() + " partition="
-          + nodePartition);
-    }
-    
-    CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
-      queueUser, this, application, queueResourceLimitsInfo);
-    
+      LOG.debug(
+          "Headroom calculation for user " + user + ": " + " userLimit=" + userLimit
+              + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+              + queueUser.getUsed() + " partition=" + nodePartition);
+    }
+
+    CapacityHeadroomProvider headroomProvider =
+        new CapacityHeadroomProvider(queueUser, this, application, queueResourceLimitsInfo);
+
     application.setHeadroomProvider(headroomProvider);
 
     usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom);
-    
+
     return userLimit;
   }
-  
+
   @Lock(NoLock.class)
   public int getNodeLocalityDelay() {
     return nodeLocalityDelay;
@@ -1623,11 +1553,10 @@ public class LeafQueue extends AbstractCSQueue {
    *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
    * @return Computed User Limit
    */
-  public Resource getResourceLimitForActiveUsers(String userName,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode) {
-    return usersManager.getComputedResourceLimitForActiveUsers(userName,
-        clusterResource, nodePartition, schedulingMode);
+  public Resource getResourceLimitForActiveUsers(String userName, Resource clusterResource,
+      String nodePartition, SchedulingMode schedulingMode) {
+    return usersManager.getComputedResourceLimitForActiveUsers(userName, clusterResource,
+        nodePartition, schedulingMode);
   }
 
   /**
@@ -1643,11 +1572,10 @@ public class LeafQueue extends AbstractCSQueue {
    *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
    * @return Computed User Limit
    */
-  public Resource getResourceLimitForAllUsers(String userName,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode) {
-    return usersManager.getComputedResourceLimitForAllUsers(userName,
-        clusterResource, nodePartition, schedulingMode);
+  public Resource getResourceLimitForAllUsers(String userName, Resource clusterResource,
+      String nodePartition, SchedulingMode schedulingMode) {
+    return usersManager.getComputedResourceLimitForAllUsers(userName, clusterResource,
+        nodePartition, schedulingMode);
   }
 
   @Private
@@ -1667,27 +1595,24 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Note: We aren't considering the current request since there is a fixed
       // overhead of the AM, but it's a > check, not a >= check, so...
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-          user.getUsed(nodePartition), limit)) {
+      if (Resources.greaterThan(resourceCalculator, clusterResource, user.getUsed(nodePartition),
+          limit)) {
         // if enabled, check to see if could we potentially use this node instead
         // of a reserved node if the application has reserved containers
         if (this.reservationsContinueLooking) {
           if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
-              Resources.subtract(user.getUsed(),
-                  application.getCurrentReservation()), limit)) {
+              Resources.subtract(user.getUsed(), application.getCurrentReservation()), limit)) {
 
             if (LOG.isDebugEnabled()) {
               LOG.debug("User " + userName + " in queue " + getQueuePath()
-                  + " will exceed limit based on reservations - "
-                  + " consumed: " + user.getUsed() + " reserved: " + application
-                  .getCurrentReservation() + " limit: " + limit);
+                  + " will exceed limit based on reservations - " + " consumed: " + user.getUsed()
+                  + " reserved: " + application.getCurrentReservation() + " limit: " + limit);
             }
-            Resource amountNeededToUnreserve = Resources.subtract(
-                user.getUsed(nodePartition), limit);
+            Resource amountNeededToUnreserve =
+                Resources.subtract(user.getUsed(nodePartition), limit);
             // we can only acquire a new container if we unreserve first to
             // respect user-limit
-            currentResourceLimits.setAmountNeededUnreserve(
-                amountNeededToUnreserve);
+            currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
             return true;
           }
         }
@@ -1705,8 +1630,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  protected void setDynamicQueueProperties(
-      CapacitySchedulerConfiguration configuration) {
+  protected void setDynamicQueueProperties(CapacitySchedulerConfiguration configuration) {
     // set to -1, to disable it
     configuration.setUserLimitFactor(getQueuePath(), -1);
     // Set Max AM percentage to a higher value
@@ -1715,8 +1639,8 @@ public class LeafQueue extends AbstractCSQueue {
     super.setDynamicQueueProperties(configuration);
   }
 
-  private void updateSchedulerHealthForCompletedContainer(
-      RMContainer rmContainer, ContainerStatus containerStatus) {
+  private void updateSchedulerHealthForCompletedContainer(RMContainer rmContainer,
+      ContainerStatus containerStatus) {
     // Update SchedulerHealth for released / preempted container
     SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
     if (null == schedulerHealth) {
@@ -1730,8 +1654,7 @@ public class LeafQueue extends AbstractCSQueue {
       schedulerHealth.updateSchedulerPreemptionCounts(1);
     } else {
       schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
-          rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
-          getQueuePath());
+          rmContainer.getAllocatedNode(), rmContainer.getContainerId(), getQueuePath());
     }
   }
 
@@ -1743,15 +1666,13 @@ public class LeafQueue extends AbstractCSQueue {
    * @param nodePartition
    *          Partition
    */
-  public void recalculateQueueUsageRatio(Resource clusterResource,
-      String nodePartition) {
+  public void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) {
     writeLock.lock();
     try {
       ResourceUsage queueResourceUsage = getQueueResourceUsage();
 
       if (nodePartition == null) {
-        for (String partition : Sets.union(
-            getQueueCapacities().getNodePartitionsSet(),
+        for (String partition : Sets.union(getQueueCapacities().getNodePartitionsSet(),
             queueResourceUsage.getNodePartitionsSet())) {
           usersManager.updateUsageRatio(partition, clusterResource);
         }
@@ -1764,10 +1685,9 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Override
-  public void completedContainer(Resource clusterResource, 
-      FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
-      ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
-      boolean sortQueues) {
+  public void completedContainer(Resource clusterResource, FiCaSchedulerApp application,
+      FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus,
+      RMContainerEventType event, CSQueue childQueue, boolean sortQueues) {
     // Update SchedulerHealth for released / preempted container
     updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
 
@@ -1784,11 +1704,10 @@ public class LeafQueue extends AbstractCSQueue {
         // happen under scheduler's lock...
         // So, this is, in effect, a transaction across application & node
         if (rmContainer.getState() == RMContainerState.RESERVED) {
-          removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
-              node, rmContainer);
-        } else{
-          removed = application.containerCompleted(rmContainer, containerStatus,
-              event, node.getPartition());
+          removed = application.unreserve(rmContainer.getReservedSchedulerKey(), node, rmContainer);
+        } else {
+          removed = application.containerCompleted(rmContainer, containerStatus, event,
+              node.getPartition());
 
           node.releaseContainer(rmContainer.getContainerId(), false);
         }
@@ -1806,20 +1725,16 @@ public class LeafQueue extends AbstractCSQueue {
         writeLock.unlock();
       }
 
-
       if (removed) {
         // Inform the parent queue _outside_ of the leaf-queue lock
-        parent.completedContainer(clusterResource, application, node,
-          rmContainer, null, event, this, sortQueues);
+        parent.completedContainer(clusterResource, application, node, rmContainer, null, event,
+            this, sortQueues);
       }
     }
 
     // Notify PreemptionManager
     csContext.getPreemptionManager().removeKillableContainer(
-        new KillableContainer(
-            rmContainer,
-            node.getPartition(),
-            getQueuePath()));
+        new KillableContainer(rmContainer, node.getPartition(), getQueuePath()));
 
     // Update preemption metrics if exit status is PREEMPTED
     if (containerStatus != null
@@ -1828,23 +1743,20 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  void allocateResource(Resource clusterResource,
-      SchedulerApplicationAttempt application, Resource resource,
-      String nodePartition, RMContainer rmContainer) {
+  void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application,
+      Resource resource, String nodePartition, RMContainer rmContainer) {
     writeLock.lock();
     try {
       super.allocateResource(clusterResource, resource, nodePartition);
 
       // handle ignore exclusivity container
-      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+      if (null != rmContainer && rmContainer.getNodeLabelExpression()
+          .equals(RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
           RMNodeLabelsManager.NO_LABEL)) {
         TreeSet<RMContainer> rmContainers = null;
-        if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
-            nodePartition))) {
+        if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(nodePartition))) {
           rmContainers = new TreeSet<>();
-          ignorePartitionExclusivityRMContainers.put(nodePartition,
-              rmContainers);
+          ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
         }
         rmContainers.add(rmContainer);
       }
@@ -1858,14 +1770,13 @@ public class LeafQueue extends AbstractCSQueue {
 
       Resource partitionHeadroom = Resources.createResource(0, 0);
       if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
-        partitionHeadroom = getHeadroom(user,
-            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
-            getResourceLimitForActiveUsers(userName, clusterResource,
-                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-            nodePartition);
+        partitionHeadroom =
+            getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+                getResourceLimitForActiveUsers(userName, clusterResource, nodePartition,
+                    SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition);
       }
-      usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
-          partitionHeadroom);
+      usageTracker.getMetrics()
+          .setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(getQueuePath() + " user=" + userName + " used="
@@ -1878,20 +1789,18 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  void releaseResource(Resource clusterResource,
-      FiCaSchedulerApp application, Resource resource, String nodePartition,
-      RMContainer rmContainer) {
+  void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource,
+      String nodePartition, RMContainer rmContainer) {
     writeLock.lock();
     try {
       super.releaseResource(clusterResource, resource, nodePartition);
 
       // handle ignore exclusivity container
-      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+      if (null != rmContainer && rmContainer.getNodeLabelExpression()
+          .equals(RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
           RMNodeLabelsManager.NO_LABEL)) {
         if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
-          Set<RMContainer> rmContainers =
-              ignorePartitionExclusivityRMContainers.get(nodePartition);
+          Set<RMContainer> rmContainers = ignorePartitionExclusivityRMContainers.get(nodePartition);
           rmContainers.remove(rmContainer);
           if (rmContainers.isEmpty()) {
             ignorePartitionExclusivityRMContainers.remove(nodePartition);
@@ -1906,40 +1815,38 @@ public class LeafQueue extends AbstractCSQueue {
 
       Resource partitionHeadroom = Resources.createResource(0, 0);
       if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
-        partitionHeadroom = getHeadroom(user,
-            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
-            getResourceLimitForActiveUsers(userName, clusterResource,
-                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-            nodePartition);
+        partitionHeadroom =
+            getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+                getResourceLimitForActiveUsers(userName, clusterResource, nodePartition,
+                    SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition);
       }
-      usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
-          partitionHeadroom);
+      usageTracker.getMetrics()
+          .setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(
             getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers="
-            + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
-            + user.getUsed());
+                + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
+                + user.getUsed());
       }
     } finally {
       writeLock.unlock();
     }
   }
-  
-  private void updateCurrentResourceLimits(
-      ResourceLimits currentResourceLimits, Resource clusterResource) {
+
+  private void updateCurrentResourceLimits(ResourceLimits currentResourceLimits,
+      Resource clusterResource) {
     // TODO: need consider non-empty node labels when resource limits supports
     // node labels
     // Even if ParentQueue will set limits respect child's max queue capacity,
     // but when allocating reserved container, CapacityScheduler doesn't do
     // this. So need cap limits by queue's max capacity here.
-    this.cachedResourceLimitsForHeadroom =
-        new ResourceLimits(currentResourceLimits.getLimit());
-    Resource queueMaxResource = getEffectiveMaxCapacityDown(
-        RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
-    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
-        resourceCalculator, clusterResource, queueMaxResource,
-        currentResourceLimits.getLimit()));
+    this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
+    Resource queueMaxResource = getEffectiveMaxCapacityDown(RMNodeLabelsManager.NO_LABEL,
+        queueAllocationSettings.getMinimumAllocation());
+    this.cachedResourceLimitsForHeadroom.setLimit(
+        Resources.min(resourceCalculator, clusterResource, queueMaxResource,
+            currentResourceLimits.getLimit()));
   }
 
   @Override
@@ -1967,12 +1874,12 @@ public class LeafQueue extends AbstractCSQueue {
       recalculateQueueUsageRatio(clusterResource, null);
 
       // Update metrics
-      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
-          this, labelManager, null);
+      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager,
+          null);
       // Update configured capacity/max-capacity for default partition only
       CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
-          labelManager.getResourceByLabel(null, clusterResource),
-          RMNodeLabelsManager.NO_LABEL, this);
+          labelManager.getResourceByLabel(null, clusterResource), RMNodeLabelsManager.NO_LABEL,
+          this);
 
       // queue metrics are updated, more resource may be available
       // activate the pending applications if possible
@@ -1983,10 +1890,8 @@ public class LeafQueue extends AbstractCSQueue {
       usersManager.userLimitNeedsRecompute();
 
       // Update application properties
-      for (FiCaSchedulerApp application : orderingPolicy
-          .getSchedulableEntities()) {
-        computeUserLimitAndSetHeadroom(application, clusterResource,
-            RMNodeLabelsManager.NO_LABEL,
+      for (FiCaSchedulerApp application : orderingPolicy.getSchedulableEntities()) {
+        computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL,
             SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
       }
     } finally {
@@ -1997,16 +1902,14 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void incUsedResource(String nodeLabel, Resource resourceToInc,
       SchedulerApplicationAttempt application) {
-    usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
-        nodeLabel, true);
+    usersManager.updateUserResourceUsage(application.getUser(), resourceToInc, nodeLabel, true);
     super.incUsedResource(nodeLabel, resourceToInc, application);
   }
 
   @Override
   public void decUsedResource(String nodeLabel, Resource resourceToDec,
       SchedulerApplicationAttempt application) {
-    usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
-        nodeLabel, false);
+    usersManager.updateUserResourceUsage(application.getUser(), resourceToDec, nodeLabel, false);
     super.decUsedResource(nodeLabel, resourceToDec, application);
   }
 
@@ -2017,8 +1920,7 @@ public class LeafQueue extends AbstractCSQueue {
       return;
     }
 
-    user.getResourceUsage().incAMUsed(nodeLabel,
-        resourceToInc);
+    user.getResourceUsage().incAMUsed(nodeLabel, resourceToInc);
     // ResourceUsage has its own lock, no addition lock needs here.
     usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
   }
@@ -2030,8 +1932,7 @@ public class LeafQueue extends AbstractCSQueue {
       return;
     }
 
-    user.getResourceUsage().decAMUsed(nodeLabel,
-        resourceToDec);
+    user.getResourceUsage().decAMUsed(nodeLabel, resourceToDec);
     // ResourceUsage has its own lock, no addition lock needs here.
     usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
   }
@@ -2048,11 +1949,9 @@ public class LeafQueue extends AbstractCSQueue {
     // Careful! Locking order is important!
     writeLock.lock();
     try {
-      FiCaSchedulerNode node = csContext.getNode(
-          rmContainer.getContainer().getNodeId());
-      allocateResource(clusterResource, attempt,
-          rmContainer.getContainer().getResource(), node.getPartition(),
-          rmContainer);
+      FiCaSchedulerNode node = csContext.getNode(rmContainer.getContainer().getNodeId());
+      allocateResource(clusterResource, attempt, rmContainer.getContainer().getResource(),
+          node.getPartition(), rmContainer);
     } finally {
       writeLock.unlock();
     }
@@ -2064,24 +1963,22 @@ public class LeafQueue extends AbstractCSQueue {
    * Obtain (read-only) collection of pending applications.
    */
   public Collection<FiCaSchedulerApp> getPendingApplications() {
-    return Collections.unmodifiableCollection(pendingOrderingPolicy
-        .getSchedulableEntities());
+    return Collections.unmodifiableCollection(pendingOrderingPolicy.getSchedulableEntities());
   }
 
   /**
    * Obtain (read-only) collection of active applications.
    */
   public Collection<FiCaSchedulerApp> getApplications() {
-    return Collections.unmodifiableCollection(orderingPolicy
-        .getSchedulableEntities());
+    return Collections.unmodifiableCollection(orderingPolicy.getSchedulableEntities());
   }
 
   /**
    * Obtain (read-only) collection of all applications.
    */
   public Collection<FiCaSchedulerApp> getAllApplications() {
-    Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
-        pendingOrderingPolicy.getSchedulableEntities());
+    Collection<FiCaSchedulerApp> apps =
+        new HashSet<FiCaSchedulerApp>(pendingOrderingPolicy.getSchedulableEntities());
     apps.addAll(orderingPolicy.getSchedulableEntities());
 
     return Collections.unmodifiableCollection(apps);
@@ -2095,7 +1992,6 @@ public class LeafQueue extends AbstractCSQueue {
    * Total pending for the queue =
    * sum(for each user(min((user's headroom), sum(user's pending requests))))
    * NOTE:
-
    * @param clusterResources clusterResource
    * @param partition node partition
    * @param deductReservedFromPending When a container is reserved in CS,
@@ -2208,7 +2104,7 @@ public class LeafQueue extends AbstractCSQueue {
       parent.detachContainer(clusterResource, application, rmContainer);
     }
   }
-  
+
   /**
    * @return all ignored partition exclusivity RMContainers in the LeafQueue,
    *         this will be used by preemption policy.
@@ -2264,7 +2160,7 @@ public class LeafQueue extends AbstractCSQueue {
       getOrderingPolicy() {
     return orderingPolicy;
   }
-  
+
   void setOrderingPolicy(
       OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
     writeLock.lock();
@@ -2318,19 +2214,19 @@ public class LeafQueue extends AbstractCSQueue {
   static class QueueResourceLimitsInfo {
     private Resource queueCurrentLimit;
     private Resource clusterResource;
-    
+
     public void setQueueCurrentLimit(Resource currentLimit) {
       this.queueCurrentLimit = currentLimit;
     }
-    
+
     public Resource getQueueCurrentLimit() {
       return queueCurrentLimit;
     }
-    
+
     public void setClusterResource(Resource clusterResource) {
       this.clusterResource = clusterResource;
     }
-    
+
     public Resource getClusterResource() {
       return clusterResource;
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 7311be7..57050b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -36,15 +36,19 @@ import java.util.Set;
  * ManagedParentQueue for auto created dynamic queues
  */
 public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
-
   private static final Logger LOG = LoggerFactory
       .getLogger(AutoCreatedLeafQueue.class);
 
   public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
       ManagedParentQueue parent) throws IOException {
+    // TODO once YARN-10907 is merged the duplicated collection of
+    //  leafQueueConfigs won't be necessary
     super(cs, parent.getLeafQueueConfigs(queueName),
         queueName,
         parent, null);
+    super.setupQueueConfigs(cs.getClusterResource(), parent.getLeafQueueConfigs(queueName));
+
+    LOG.debug("Initialized AutoCreatedLeafQueue: name={}, fullname={}", queueName, getQueuePath());
     updateCapacitiesToZero();
   }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
index 93d0017..fedde05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
@@ -182,7 +182,7 @@ public class CSMaxRunningAppsEnforcer {
     // the queue was already at its max before the removal.
     // Thus we find the ancestor queue highest in the tree for which the app
     // that was at its maxRunningApps before the removal.
-    LeafQueue queue = app.getCSLeafQueue();
+    AbstractLeafQueue queue = app.getCSLeafQueue();
     AbstractCSQueue highestQueueWithAppsNowRunnable =
         (queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
         ? queue : null;
@@ -243,7 +243,7 @@ public class CSMaxRunningAppsEnforcer {
       }
 
       if (checkRunnabilityWithUpdate(next)) {
-        LeafQueue nextQueue = next.getCSLeafQueue();
+        AbstractLeafQueue nextQueue = next.getCSLeafQueue();
         LOG.info("{} is now runnable in {}",
             next.getApplicationAttemptId(), nextQueue);
         trackRunnableApp(next);
@@ -322,9 +322,9 @@ public class CSMaxRunningAppsEnforcer {
   private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
       List<List<FiCaSchedulerApp>> appLists) {
     if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
-      if (queue instanceof LeafQueue) {
+      if (queue instanceof AbstractLeafQueue) {
         appLists.add(
-            ((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
+            ((AbstractLeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
       } else {
         for (CSQueue child : queue.getChildQueues()) {
           gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index 140a2ac..de31cd13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 public class CapacityHeadroomProvider {
   
   UsersManager.User user;
-  LeafQueue queue;
+  AbstractLeafQueue queue;
   FiCaSchedulerApp application;
-  LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
+  AbstractLeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
   
-  public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
+  public CapacityHeadroomProvider(UsersManager.User user, AbstractLeafQueue queue,
       FiCaSchedulerApp application,
-      LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
+      AbstractLeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
 
     this.user = user;
     this.queue = queue;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bd1089b..befb82a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -917,7 +917,7 @@ public class CapacityScheduler extends
           throw new QueueInvalidException(queueErrorMsg);
         }
       }
-      if (!(queue instanceof LeafQueue)) {
+      if (!(queue instanceof AbstractLeafQueue)) {
         // During RM restart, this means leaf queue was converted to a parent
         // queue, which is not supported for running apps.
         if (!appShouldFailFast) {
@@ -942,7 +942,7 @@ public class CapacityScheduler extends
       // that means its previous state was DRAINING. So we auto transit
       // the state to DRAINING for recovery.
       if (queue.getState() == QueueState.STOPPED) {
-        ((LeafQueue) queue).recoverDrainingState();
+        ((AbstractLeafQueue) queue).recoverDrainingState();
       }
       // Submit to the queue
       try {
@@ -1090,7 +1090,7 @@ public class CapacityScheduler extends
         return;
       }
 
-      if (!(queue instanceof LeafQueue)) {
+      if (!(queue instanceof AbstractLeafQueue)) {
         String message =
             "Application " + applicationId + " submitted by user : " + user
                 + " to non-leaf queue : " + queueName;
@@ -1242,7 +1242,7 @@ public class CapacityScheduler extends
         return;
       }
       CSQueue queue = (CSQueue) application.getQueue();
-      if (!(queue instanceof LeafQueue)) {
+      if (!(queue instanceof AbstractLeafQueue)) {
         LOG.error("Cannot finish application " + "from non-leaf queue: " + queue
             .getQueuePath());
       } else{
@@ -1301,7 +1301,7 @@ public class CapacityScheduler extends
       // Inform the queue
       Queue  queue = attempt.getQueue();
       CSQueue csQueue = (CSQueue) queue;
-      if (!(csQueue instanceof LeafQueue)) {
+      if (!(csQueue instanceof AbstractLeafQueue)) {
         LOG.error(
             "Cannot finish application " + "from non-leaf queue: "
             + csQueue.getQueuePath());
@@ -1367,7 +1367,7 @@ public class CapacityScheduler extends
     // Release containers
     releaseContainers(release, application);
 
-    LeafQueue updateDemandForQueue = null;
+    AbstractLeafQueue updateDemandForQueue = null;
 
     // Sanity check for new allocation requests
     normalizeResourceRequests(ask);
@@ -1398,7 +1398,7 @@ public class CapacityScheduler extends
         // Update application requests
         if (application.updateResourceRequests(ask) || application
             .updateSchedulingRequests(schedulingRequests)) {
-          updateDemandForQueue = (LeafQueue) application.getQueue();
+          updateDemandForQueue = (AbstractLeafQueue) application.getQueue();
         }
 
         if (LOG.isDebugEnabled()) {
@@ -1783,7 +1783,7 @@ public class CapacityScheduler extends
     LOG.debug("Trying to fulfill reservation for application {} on node: {}",
         reservedApplication.getApplicationId(), node.getNodeID());
 
-    LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+    AbstractLeafQueue queue = ((AbstractLeafQueue) reservedApplication.getQueue());
     CSAssignment assignment = queue.assignContainers(getClusterResource(),
         new SimpleCandidateNodeSet<>(node),
         // TODO, now we only consider limits for parent for non-labeled
@@ -2386,7 +2386,7 @@ public class CapacityScheduler extends
     }
 
     // Inform the queue
-    LeafQueue queue = (LeafQueue) application.getQueue();
+    AbstractLeafQueue queue = (AbstractLeafQueue) application.getQueue();
     queue.completedContainer(getClusterResource(), application, node,
         rmContainer, containerStatus, event, null, true);
   }
@@ -2673,7 +2673,7 @@ public class CapacityScheduler extends
       throws YarnException {
     writeLock.lock();
     try {
-      LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
+      AbstractLeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
       AbstractManagedParentQueue parent =
           (AbstractManagedParentQueue) queue.getParent();
 
@@ -2716,10 +2716,10 @@ public class CapacityScheduler extends
         throw new YarnException("App to be moved " + appId + " not found.");
       }
       String sourceQueueName = application.getQueue().getQueueName();
-      LeafQueue source =
+      AbstractLeafQueue source =
           this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(targetQueueName);
-      LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
+      AbstractLeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
 
       String user = application.getUser();
       try {
@@ -2777,7 +2777,7 @@ public class CapacityScheduler extends
           ((CSQueue) queue).getQueuePath() : queue.getQueueName();
       this.queueManager.getAndCheckLeafQueue(sourceQueueName);
       String destQueueName = handleMoveToPlanQueue(newQueue);
-      LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
+      AbstractLeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
       // Validation check - ACLs, submission limits for user & queue
       String user = application.getUser();
       // Check active partition only when attempt is available
@@ -2804,7 +2804,7 @@ public class CapacityScheduler extends
    * @param dest
    * @throws YarnException
    */
-  private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest)
+  private void checkQueuePartition(FiCaSchedulerApp app, AbstractLeafQueue dest)
       throws YarnException {
     if (!YarnConfiguration.areNodeLabelsEnabled(conf)) {
       return;
@@ -2854,7 +2854,7 @@ public class CapacityScheduler extends
       }
       return getMaximumResourceCapability();
     }
-    if (!(queue instanceof LeafQueue)) {
+    if (!(queue instanceof AbstractLeafQueue)) {
       LOG.error("queue " + queueName + " is not an leaf queue");
       return getMaximumResourceCapability();
     }
@@ -2863,7 +2863,7 @@ public class CapacityScheduler extends
     // getMaximumResourceCapability() returns maximum allocation considers
     // per-node maximum resources. So return (component-wise) min of the two.
 
-    Resource queueMaxAllocation = ((LeafQueue)queue).getMaximumAllocation();
+    Resource queueMaxAllocation = queue.getMaximumAllocation();
     Resource clusterMaxAllocationConsiderNodeMax =
         getMaximumResourceCapability();
 
@@ -2989,7 +2989,7 @@ public class CapacityScheduler extends
 
       // As we use iterator over a TreeSet for OrderingPolicy, once we change
       // priority then reinsert back to make order correct.
-      LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
+      AbstractLeafQueue queue = (AbstractLeafQueue) getQueue(rmApp.getQueue());
       queue.updateApplicationPriority(application, appPriority);
 
       LOG.info("Priority '" + appPriority + "' is updated in queue :"
@@ -3404,14 +3404,14 @@ public class CapacityScheduler extends
     readLock.lock();
     try {
       CSQueue queue = getQueue(queueName);
-      if (queue == null || !(queue instanceof LeafQueue)) {
+      if (!(queue instanceof AbstractLeafQueue)) {
         return lifetimeRequestedByApp;
       }
 
       long defaultApplicationLifetime =
-          ((LeafQueue) queue).getDefaultApplicationLifetime();
+          queue.getDefaultApplicationLifetime();
       long maximumApplicationLifetime =
-          ((LeafQueue) queue).getMaximumApplicationLifetime();
+          queue.getMaximumApplicationLifetime();
 
       // check only for maximum, that's enough because default can't
       // exceed maximum
@@ -3434,7 +3434,7 @@ public class CapacityScheduler extends
   @Override
   public long getMaximumApplicationLifetime(String queueName) {
     CSQueue queue = getQueue(queueName);
-    if (queue == null || !(queue instanceof LeafQueue)) {
+    if (!(queue instanceof AbstractLeafQueue)) {
       if (isAmbiguous(queueName)) {
         LOG.error("Ambiguous queue reference: " + queueName
             + " please use full queue path instead.");
@@ -3444,7 +3444,7 @@ public class CapacityScheduler extends
       return -1;
     }
     // In seconds
-    return ((LeafQueue) queue).getMaximumApplicationLifetime();
+    return queue.getMaximumApplicationLifetime();
   }
 
   @Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
index fd601ac..147f392 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
@@ -186,7 +186,7 @@ public final class CapacitySchedulerConfigValidator {
                 + " is set to true");
       }
 
-      if (newQueue instanceof LeafQueue) {
+      if (newQueue instanceof AbstractLeafQueue) {
         LOG.info("Converting the parent queue: {} to leaf queue.", oldQueue.getQueuePath());
       }
     }
@@ -194,7 +194,7 @@ public final class CapacitySchedulerConfigValidator {
 
   private static void validateLeafQueueConversion(CSQueue oldQueue,
                                                   CSQueue newQueue) throws IOException {
-    if (oldQueue instanceof LeafQueue && newQueue instanceof ParentQueue) {
+    if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof ParentQueue) {
       if (isEitherQueueStopped(oldQueue.getState(), newQueue.getState())) {
         LOG.info("Converting the leaf queue: {} to parent queue.", oldQueue.getQueuePath());
       } else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index f0c8a27..407383d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -375,8 +375,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
       permissions.add(
           new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
 
-      if (queue instanceof LeafQueue) {
-        LeafQueue lQueue = (LeafQueue) queue;
+      if (queue instanceof AbstractLeafQueue) {
+        AbstractLeafQueue lQueue = (AbstractLeafQueue) queue;
 
         // Clear Priority ACLs first since reinitialize also call same.
         appPriorityACLManager.clearPriorityACLs(lQueue.getQueuePath());
@@ -397,17 +397,17 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
    * @throws YarnException if the queue does not exist or the queue
    *           is not the type of LeafQueue.
    */
-  public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+  public AbstractLeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
     CSQueue ret = this.getQueue(queue);
     if (ret == null) {
       throw new YarnException("The specified Queue: " + queue
           + " doesn't exist");
     }
-    if (!(ret instanceof LeafQueue)) {
+    if (!(ret instanceof AbstractLeafQueue)) {
       throw new YarnException("The specified Queue: " + queue
           + " is not a Leaf Queue.");
     }
-    return (LeafQueue) ret;
+    return (AbstractLeafQueue) ret;
   }
 
   /**
@@ -527,7 +527,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
    * @throws YarnException if the given path is not eligible to be auto created
    * @throws IOException if the given path can not be added to the parent
    */
-  public LeafQueue createQueue(QueuePath queue)
+  public AbstractLeafQueue createQueue(QueuePath queue)
       throws YarnException, IOException {
     String leafQueueName = queue.getLeafName();
     String parentQueueName = queue.getParent();
@@ -668,7 +668,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
     return leafQueue;
   }
 
-  private LeafQueue createLegacyAutoQueue(QueuePath queue)
+  private AbstractLeafQueue createLegacyAutoQueue(QueuePath queue)
       throws IOException, SchedulerDynamicEditException {
     CSQueue parentQueue = getQueue(queue.getParent());
     // Case 1: Handle ManagedParentQueue
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 4592f2a..b9fa932 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,128 +19,18 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
 
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DateUtils;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.util.Sets;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueState;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.AccessType;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.server.utils.Lock;
-import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import org.apache.hadoop.classification.VisibleForTesting;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 @Private
 @Unstable
-public class LeafQueue extends AbstractCSQueue {
+public class LeafQueue extends AbstractLeafQueue {
   private static final Logger LOG =
       LoggerFactory.getLogger(LeafQueue.class);
 
-  private float absoluteUsedCapacity = 0.0f;
-
-  // TODO the max applications should consider label
-  protected int maxApplications;
-  protected volatile int maxApplicationsPerUser;
-  
-  private float maxAMResourcePerQueuePercent;
-
-  private volatile int nodeLocalityDelay;
-  private volatile int rackLocalityAdditionalDelay;
-  private volatile boolean rackLocalityFullReset;
-
-  Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
-      new ConcurrentHashMap<>();
-
-  private Priority defaultAppPriorityPerQueue;
-
-  private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy;
-
-  private volatile float minimumAllocationFactor;
-
-  private final RecordFactory recordFactory = 
-    RecordFactoryProvider.getRecordFactory(null);
-
-  private final UsersManager usersManager;
-
-  // cache last cluster resource to compute actual capacity
-  private Resource lastClusterResource = Resources.none();
-
-  private final QueueResourceLimitsInfo queueResourceLimitsInfo =
-      new QueueResourceLimitsInfo();
-
-  private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
-
-  private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
-
-  // Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
-  // Not thread safe: only the last level is a ConcurrentMap
-  @VisibleForTesting
-  Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
-      userLimitsCache = new HashMap<>();
-
-  // Not thread safe
-  @VisibleForTesting
-  long currentUserLimitCacheVersion = 0;
-
-  // record all ignore partition exclusivityRMContainer, this will be used to do
-  // preemption, key is the partition of the RMContainer allocated on
-  private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
-      new ConcurrentHashMap<>();
-
-  List<AppPriorityACLGroup> priorityAcls =
-      new ArrayList<AppPriorityACLGroup>();
-
-  private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
-  private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
-
   @SuppressWarnings({ "unchecked", "rawtypes" })
   public LeafQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -157,2330 +47,10 @@ public class LeafQueue extends AbstractCSQueue {
       CapacitySchedulerConfiguration configuration,
       String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
       IOException {
-    super(cs, configuration, queueName, parent, old);
-    setDynamicQueue(isDynamic);
-
-    this.usersManager = new UsersManager(usageTracker.getMetrics(), this, labelManager, csContext,
-        resourceCalculator);
-
-    // One time initialization is enough since it is static ordering policy
-    this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
-
-    LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
+    super(cs, configuration, queueName, parent, old, isDynamic);
 
     setupQueueConfigs(cs.getClusterResource(), configuration);
 
-  }
-
-  @SuppressWarnings("checkstyle:nowhitespaceafter")
-  protected void setupQueueConfigs(Resource clusterResource,
-      CapacitySchedulerConfiguration conf) throws
-      IOException {
-    writeLock.lock();
-    try {
-      CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
-      super.setupQueueConfigs(clusterResource, conf);
-
-      this.lastClusterResource = clusterResource;
-
-      this.cachedResourceLimitsForHeadroom = new ResourceLimits(
-          clusterResource);
-
-      // Initialize headroom info, also used for calculating application
-      // master resource limits.  Since this happens during queue initialization
-      // and all queues may not be realized yet, we'll use (optimistic)
-      // absoluteMaxCapacity (it will be replaced with the more accurate
-      // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
-      setQueueResourceLimitsInfo(clusterResource);
-
-      setOrderingPolicy(
-          conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
-
-      usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
-      usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
-
-      maxAMResourcePerQueuePercent =
-          conf.getMaximumApplicationMasterResourcePerQueuePercent(
-              getQueuePath());
-
-      maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
-      if (maxApplications < 0) {
-        int maxGlobalPerQueueApps =
-            csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue();
-        if (maxGlobalPerQueueApps > 0) {
-          maxApplications = maxGlobalPerQueueApps;
-        }
-      }
-
-      priorityAcls = conf.getPriorityAcls(getQueuePath(),
-          csContext.getMaxClusterLevelAppPriority());
-
-      Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
-      if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
-          this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
-        throw new IOException(
-            "Invalid default label expression of " + " queue=" + getQueuePath()
-                + " doesn't have permission to access all labels "
-                + "in default label expression. labelExpression of resource request="
-                + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
-                getAccessibleNodeLabels() == null ?
-                    "" :
-                    StringUtils
-                        .join(getAccessibleNodeLabels().iterator(), ',')));
-      }
-
-      nodeLocalityDelay = schedConf.getNodeLocalityDelay();
-      rackLocalityAdditionalDelay = schedConf
-          .getRackLocalityAdditionalDelay();
-      rackLocalityFullReset = schedConf
-          .getRackLocalityFullReset();
-
-      // re-init this since max allocation could have changed
-      this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
-          Resources.subtract(
-              queueAllocationSettings.getMaximumAllocation(),
-              queueAllocationSettings.getMinimumAllocation()),
-          queueAllocationSettings.getMaximumAllocation());
-
-      StringBuilder aclsString = new StringBuilder();
-      for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
-        aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
-      }
-
-      StringBuilder labelStrBuilder = new StringBuilder();
-      if (accessibleNodeLabels != null) {
-        for (String nodeLabel : accessibleNodeLabels) {
-          labelStrBuilder.append(nodeLabel).append(",");
-        }
-      }
-
-      defaultAppPriorityPerQueue = Priority.newInstance(
-          conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
-
-      // Validate leaf queue's user's weights.
-      float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
-      getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
-      usersManager.updateUserWeights();
-
-      LOG.info(
-          "Initializing " + getQueuePath() + "\n" +
-              getExtendedCapacityOrWeightString() + "\n"
-              + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity()
-              + " [= parentAbsoluteCapacity * capacity ]" + "\n"
-              + "maxCapacity = " + queueCapacities.getMaximumCapacity()
-              + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = "
-              + queueCapacities.getAbsoluteMaximumCapacity()
-              + " [= 1.0 maximumCapacity undefined, "
-              + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
-              + "\n" + "effectiveMinResource=" +
-              getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n"
-              + " , effectiveMaxResource=" +
-              getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL)
-              + "\n" + "userLimit = " + usersManager.getUserLimit()
-              + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
-              + usersManager.getUserLimitFactor()
-              + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
-              + maxApplications
-              + " [= configuredMaximumSystemApplicationsPerQueue or"
-              + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
-              + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
-              + " [= (int)(maxApplications * (userLimit / 100.0f) * "
-              + "userLimitFactor) ]" + "\n"
-              + "maxParallelApps = " + getMaxParallelApps() + "\n"
-              + "usedCapacity = " +
-              + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
-              + "(clusterResourceMemory * absoluteCapacity)]" + "\n"
-              + "absoluteUsedCapacity = " + absoluteUsedCapacity
-              + " [= usedResourcesMemory / clusterResourceMemory]" + "\n"
-              + "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent
-              + " [= configuredMaximumAMResourcePercent ]" + "\n"
-              + "minimumAllocationFactor = " + minimumAllocationFactor
-              + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
-              + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
-              + queueAllocationSettings.getMaximumAllocation() +
-              " [= configuredMaxAllocation ]" + "\n"
-              + "numContainers = " + usageTracker.getNumContainers()
-              + " [= currentNumContainers ]" + "\n" + "state = " + getState()
-              + " [= configuredState ]" + "\n" + "acls = " + aclsString
-              + " [= configuredAcls ]" + "\n"
-              + "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
-              + "rackLocalityAdditionalDelay = "
-              + rackLocalityAdditionalDelay + "\n"
-              + "labels=" + labelStrBuilder.toString() + "\n"
-              + "reservationsContinueLooking = "
-              + reservationsContinueLooking + "\n" + "preemptionDisabled = "
-              + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
-              + defaultAppPriorityPerQueue + "\npriority = " + priority
-              + "\nmaxLifetime = " + getMaximumApplicationLifetime()
-              + " seconds" + "\ndefaultLifetime = "
-              + getDefaultApplicationLifetime() + " seconds");
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private String getDefaultNodeLabelExpressionStr() {
-    String defaultLabelExpression = queueNodeLabelsSettings.getDefaultLabelExpression();
-    return defaultLabelExpression == null ? "" : defaultLabelExpression;
-  }
-
-  /**
-   * Used only by tests.
-   */
-  @Private
-  public float getMinimumAllocationFactor() {
-    return minimumAllocationFactor;
-  }
-  
-  /**
-   * Used only by tests.
-   */
-  @Private
-  public float getMaxAMResourcePerQueuePercent() {
-    return maxAMResourcePerQueuePercent;
-  }
-
-  public int getMaxApplications() {
-    return maxApplications;
-  }
-
-  public int getMaxApplicationsPerUser() {
-    return maxApplicationsPerUser;
-  }
-
-  /**
-   *
-   * @return UsersManager instance.
-   */
-  public UsersManager getUsersManager() {
-    return usersManager;
-  }
-
-  @Override
-  public AbstractUsersManager getAbstractUsersManager() {
-    return usersManager;
-  }
-
-  @Override
-  public List<CSQueue> getChildQueues() {
-    return null;
-  }
-  
-  /**
-   * Set user limit.
-   * @param userLimit new user limit
-   */
-  @VisibleForTesting
-  void setUserLimit(float userLimit) {
-    usersManager.setUserLimit(userLimit);
-    usersManager.userLimitNeedsRecompute();
-  }
-
-  /**
-   * Set user limit factor.
-   * @param userLimitFactor new user limit factor
-   */
-  @VisibleForTesting
-  void setUserLimitFactor(float userLimitFactor) {
-    usersManager.setUserLimitFactor(userLimitFactor);
-    usersManager.userLimitNeedsRecompute();
-  }
-
-  @Override
-  public int getNumApplications() {
-    readLock.lock();
-    try {
-      return getNumPendingApplications() + getNumActiveApplications() +
-          getNumNonRunnableApps();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public int getNumPendingApplications() {
-    readLock.lock();
-    try {
-      return pendingOrderingPolicy.getNumSchedulableEntities();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public int getNumActiveApplications() {
-    readLock.lock();
-    try {
-      return orderingPolicy.getNumSchedulableEntities();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Private
-  public int getNumPendingApplications(String user) {
-    readLock.lock();
-    try {
-      User u = getUser(user);
-      if (null == u) {
-        return 0;
-      }
-      return u.getPendingApplications();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Private
-  public int getNumActiveApplications(String user) {
-    readLock.lock();
-    try {
-      User u = getUser(user);
-      if (null == u) {
-        return 0;
-      }
-      return u.getActiveApplications();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Private
-  public float getUserLimit() {
-    return usersManager.getUserLimit();
-  }
-
-  @Private
-  public float getUserLimitFactor() {
-    return usersManager.getUserLimitFactor();
-  }
-
-  @Override
-  public QueueInfo getQueueInfo(
-      boolean includeChildQueues, boolean recursive) {
-    QueueInfo queueInfo = getQueueInfo();
-    return queueInfo;
-  }
-
-  @Override
-  public List<QueueUserACLInfo>
-  getQueueUserAclInfo(UserGroupInformation user) {
-    readLock.lock();
-    try {
-      QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
-          QueueUserACLInfo.class);
-      List<QueueACL> operations = new ArrayList<>();
-      for (QueueACL operation : QueueACL.values()) {
-        if (hasAccess(operation, user)) {
-          operations.add(operation);
-        }
-      }
-
-      userAclInfo.setQueueName(getQueuePath());
-      userAclInfo.setUserAcls(operations);
-      return Collections.singletonList(userAclInfo);
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  public String toString() {
-    readLock.lock();
-    try {
-      return getQueuePath() + ": " + getCapacityOrWeightString()
-          + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
-          + ", " + "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", "
-          + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity="
-          + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications()
-          + ", " + "numContainers=" + getNumContainers() + ", "
-          + "effectiveMinResource=" +
-          getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) +
-          " , effectiveMaxResource=" +
-          getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL);
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  protected String getExtendedCapacityOrWeightString() {
-    if (queueCapacities.getWeight() != -1) {
-      return "weight = " + queueCapacities.getWeight()
-          + " [= (float) configuredCapacity (with w suffix)] " + "\n"
-          + "normalizedWeight = " + queueCapacities.getNormalizedWeight()
-          + " [= (float) configuredCapacity / sum(configuredCapacity of " +
-          "all queues under the parent)]";
-    } else {
-      return "capacity = " + queueCapacities.getCapacity()
-          + " [= (float) configuredCapacity / 100 ]";
-    }
-  }
-
-  @VisibleForTesting
-  public User getUser(String userName) {
-    return usersManager.getUser(userName);
-  }
-
-  @VisibleForTesting
-  public User getOrCreateUser(String userName) {
-    return usersManager.getUserAndAddIfAbsent(userName);
-  }
-
-  @Private
-  public List<AppPriorityACLGroup> getPriorityACLs() {
-    readLock.lock();
-    try {
-      return new ArrayList<>(priorityAcls);
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  protected void reinitialize(
-      CSQueue newlyParsedQueue, Resource clusterResource,
-      CapacitySchedulerConfiguration configuration) throws
-      IOException {
-
-    writeLock.lock();
-    try {
-      // We skip reinitialize for dynamic queues, when this is called, and
-      // new queue is different from this queue, we will make this queue to be
-      // static queue.
-      if (newlyParsedQueue != this) {
-        this.setDynamicQueue(false);
-      }
-
-      // Sanity check
-      if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
-          .getQueuePath().equals(getQueuePath())) {
-        throw new IOException(
-            "Trying to reinitialize " + getQueuePath() + " from "
-                + newlyParsedQueue.getQueuePath());
-      }
-
-      LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
-
-      // don't allow the maximum allocation to be decreased in size
-      // since we have already told running AM's the size
-      Resource oldMax = getMaximumAllocation();
-      Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
-
-      if (!Resources.fitsIn(oldMax, newMax)) {
-        throw new IOException("Trying to reinitialize " + getQueuePath()
-            + " the maximum allocation size can not be decreased!"
-            + " Current setting: " + oldMax + ", trying to set it to: "
-            + newMax);
-      }
-
-      setupQueueConfigs(clusterResource, configuration);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void reinitialize(
-      CSQueue newlyParsedQueue, Resource clusterResource)
-  throws IOException {
-    reinitialize(newlyParsedQueue, clusterResource,
-        csContext.getConfiguration());
-  }
-
-  @Override
-  public void submitApplicationAttempt(FiCaSchedulerApp application,
-      String userName) {
-    submitApplicationAttempt(application, userName, false);
-  }
-
-  @Override
-  public void submitApplicationAttempt(FiCaSchedulerApp application,
-      String userName, boolean isMoveApp) {
-    // Careful! Locking order is important!
-    writeLock.lock();
-    try {
-      // TODO, should use getUser, use this method just to avoid UT failure
-      // which is caused by wrong invoking order, will fix UT separately
-      User user = usersManager.getUserAndAddIfAbsent(userName);
-
-      // Add the attempt to our data-structures
-      addApplicationAttempt(application, user);
-    } finally {
-      writeLock.unlock();
-    }
-
-    // We don't want to update metrics for move app
-    if (!isMoveApp) {
-      boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
-          application.getAppSchedulingInfo().isUnmanagedAM();
-      usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
-    }
-
-    parent.submitApplicationAttempt(application, userName);
-  }
-
-  @Override
-  public void submitApplication(ApplicationId applicationId, String userName,
-      String queue)  throws AccessControlException {
-    // Careful! Locking order is important!
-    validateSubmitApplication(applicationId, userName, queue);
-
-    // Signal for expired auto deletion.
-    updateLastSubmittedTimeStamp();
-
-    // Inform the parent queue
-    try {
-      parent.submitApplication(applicationId, userName, queue);
-    } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application to parent-queue: " +
-          parent.getQueuePath(), ace);
-      throw ace;
-    }
-
-  }
-
-  public void validateSubmitApplication(ApplicationId applicationId,
-      String userName, String queue) throws AccessControlException {
-    writeLock.lock();
-    try {
-      // Check if the queue is accepting jobs
-      if (getState() != QueueState.RUNNING) {
-        String msg = "Queue " + getQueuePath()
-            + " is STOPPED. Cannot accept submission of application: "
-            + applicationId;
-        LOG.info(msg);
-        throw new AccessControlException(msg);
-      }
-
-      // Check submission limits for queues
-      //TODO recalculate max applications because they can depend on capacity
-      if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) {
-        String msg =
-            "Queue " + getQueuePath() + " already has " + getNumApplications()
-                + " applications,"
-                + " cannot accept submission of application: " + applicationId;
-        LOG.info(msg);
-        throw new AccessControlException(msg);
-      }
-
-      // Check submission limits for the user on this queue
-      User user = usersManager.getUserAndAddIfAbsent(userName);
-      //TODO recalculate max applications because they can depend on capacity
-      if (user.getTotalApplications() >= getMaxApplicationsPerUser() &&  !(this instanceof AutoCreatedLeafQueue)) {
-        String msg = "Queue " + getQueuePath() + " already has " + user
-            .getTotalApplications() + " applications from user " + userName
-            + " cannot accept submission of application: " + applicationId;
-        LOG.info(msg);
-        throw new AccessControlException(msg);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-
-    try {
-      parent.validateSubmitApplication(applicationId, userName, queue);
-    } catch (AccessControlException ace) {
-      LOG.info("Failed to submit application to parent-queue: " + 
-          parent.getQueuePath(), ace);
-      throw ace;
-    }
-  }
-  
-  public Resource getAMResourceLimit() {
-    return usageTracker.getQueueUsage().getAMLimit();
-  }
-
-  public Resource getAMResourceLimitPerPartition(String nodePartition) {
-    return usageTracker.getQueueUsage().getAMLimit(nodePartition);
-  }
-
-  @VisibleForTesting
-  public Resource calculateAndGetAMResourceLimit() {
-    return calculateAndGetAMResourceLimitPerPartition(
-        RMNodeLabelsManager.NO_LABEL);
-  }
-
-  @VisibleForTesting
-  public Resource getUserAMResourceLimit() {
-    return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
-         null);
-  }
-
-  public Resource getUserAMResourceLimitPerPartition(
-      String nodePartition, String userName) {
-    float userWeight = 1.0f;
-    if (userName != null && getUser(userName) != null) {
-      userWeight = getUser(userName).getWeight();
-    }
-
-    readLock.lock();
-    try {
-      /*
-       * The user am resource limit is based on the same approach as the user
-       * limit (as it should represent a subset of that). This means that it uses
-       * the absolute queue capacity (per partition) instead of the max and is
-       * modified by the userlimit and the userlimit factor as is the userlimit
-       */
-      float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
-          1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
-      float preWeightedUserLimit = effectiveUserLimit;
-      effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
-
-      Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
-
-      Resource minimumAllocation = queueAllocationSettings.getMinimumAllocation();
-
-      Resource userAMLimit = Resources.multiplyAndNormalizeUp(
-          resourceCalculator, queuePartitionResource,
-          queueCapacities.getMaxAMResourcePercentage(nodePartition)
-              * effectiveUserLimit * usersManager.getUserLimitFactor(),
-          minimumAllocation);
-
-      if (getUserLimitFactor() == -1) {
-        userAMLimit = Resources.multiplyAndNormalizeUp(
-            resourceCalculator, queuePartitionResource,
-            queueCapacities.getMaxAMResourcePercentage(nodePartition),
-            minimumAllocation);
-      }
-
-      userAMLimit =
-          Resources.min(resourceCalculator, lastClusterResource,
-              userAMLimit,
-              Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
-
-      Resource preWeighteduserAMLimit =
-          Resources.multiplyAndNormalizeUp(
-          resourceCalculator, queuePartitionResource,
-          queueCapacities.getMaxAMResourcePercentage(nodePartition)
-              * preWeightedUserLimit * usersManager.getUserLimitFactor(),
-          minimumAllocation);
-
-      if (getUserLimitFactor() == -1) {
-        preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
-            resourceCalculator, queuePartitionResource,
-            queueCapacities.getMaxAMResourcePercentage(nodePartition),
-            minimumAllocation);
-      }
-
-      preWeighteduserAMLimit =
-          Resources.min(resourceCalculator, lastClusterResource,
-              preWeighteduserAMLimit,
-              Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
-      usageTracker.getQueueUsage().setUserAMLimit(nodePartition, preWeighteduserAMLimit);
-
-      LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted"
-          + " user AM limit: {}. User weight: {}", userName,
-          preWeighteduserAMLimit, userAMLimit, userWeight);
-      return userAMLimit;
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  public Resource calculateAndGetAMResourceLimitPerPartition(
-      String nodePartition) {
-    writeLock.lock();
-    try {
-      /*
-       * For non-labeled partition, get the max value from resources currently
-       * available to the queue and the absolute resources guaranteed for the
-       * partition in the queue. For labeled partition, consider only the absolute
-       * resources guaranteed. Multiply this value (based on labeled/
-       * non-labeled), * with per-partition am-resource-percent to get the max am
-       * resource limit for this queue and partition.
-       */
-      Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
-
-      Resource queueCurrentLimit = Resources.none();
-      // For non-labeled partition, we need to consider the current queue
-      // usage limit.
-      if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
-        synchronized (queueResourceLimitsInfo){
-          queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
-        }
-      }
-
-      float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
-          nodePartition);
-
-      // Current usable resource for this queue and partition is the max of
-      // queueCurrentLimit and queuePartitionResource.
-      // If any of the resources available to this queue are less than queue's
-      // guarantee, use the guarantee as the queuePartitionUsableResource
-      // because nothing less than the queue's guarantee should be used when
-      // calculating the AM limit.
-      Resource queuePartitionUsableResource = (Resources.fitsIn(
-          resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
-              queueCurrentLimit : queuePartitionResource;
-
-      Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
-          resourceCalculator, queuePartitionUsableResource, amResourcePercent,
-          queueAllocationSettings.getMinimumAllocation());
-
-      usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
-      usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
-      LOG.debug("Queue: {}, node label : {}, queue partition resource : {},"
-          + " queue current limit : {}, queue partition usable resource : {},"
-          + " amResourceLimit : {}", getQueuePath(), nodePartition,
-          queuePartitionResource, queueCurrentLimit,
-          queuePartitionUsableResource, amResouceLimit);
-      return amResouceLimit;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  protected void activateApplications() {
-    writeLock.lock();
-    try {
-      // limit of allowed resource usage for application masters
-      Map<String, Resource> userAmPartitionLimit =
-          new HashMap<String, Resource>();
-
-      // AM Resource Limit for accessible labels can be pre-calculated.
-      // This will help in updating AMResourceLimit for all labels when queue
-      // is initialized for the first time (when no applications are present).
-      for (String nodePartition : getNodeLabelsForQueue()) {
-        calculateAndGetAMResourceLimitPerPartition(nodePartition);
-      }
-
-      for (Iterator<FiCaSchedulerApp> fsApp =
-           getPendingAppsOrderingPolicy()
-               .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
-           fsApp.hasNext(); ) {
-        FiCaSchedulerApp application = fsApp.next();
-        ApplicationId applicationId = application.getApplicationId();
-
-        // Get the am-node-partition associated with each application
-        // and calculate max-am resource limit for this partition.
-        String partitionName = application.getAppAMNodePartitionName();
-
-        Resource amLimit = getAMResourceLimitPerPartition(partitionName);
-        // Verify whether we already calculated am-limit for this label.
-        if (amLimit == null) {
-          amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
-        }
-        // Check am resource limit.
-        Resource amIfStarted = Resources.add(
-            application.getAMResource(partitionName),
-            usageTracker.getQueueUsage().getAMUsed(partitionName));
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("application " + application.getId() + " AMResource "
-              + application.getAMResource(partitionName)
-              + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
-              + " amLimit " + amLimit + " lastClusterResource "
-              + lastClusterResource + " amIfStarted " + amIfStarted
-              + " AM node-partition name " + partitionName);
-        }
-
-        if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
-          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
-              resourceCalculator, lastClusterResource,
-              usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
-            LOG.warn("maximum-am-resource-percent is insufficient to start a"
-                + " single application in queue, it is likely set too low."
-                + " skipping enforcement to allow at least one application"
-                + " to start");
-          } else{
-            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
-                CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
-            LOG.debug("Not activating application {} as  amIfStarted: {}"
-                + " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
-            continue;
-          }
-        }
-
-        // Check user am resource limit
-        User user = usersManager.getUserAndAddIfAbsent(application.getUser());
-        Resource userAMLimit = userAmPartitionLimit.get(partitionName);
-
-        // Verify whether we already calculated user-am-limit for this label.
-        if (userAMLimit == null) {
-          userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
-              application.getUser());
-          userAmPartitionLimit.put(partitionName, userAMLimit);
-        }
-
-        Resource userAmIfStarted = Resources.add(
-            application.getAMResource(partitionName),
-            user.getConsumedAMResources(partitionName));
-
-        if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
-          if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
-              resourceCalculator, lastClusterResource,
-              usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
-            LOG.warn("maximum-am-resource-percent is insufficient to start a"
-                + " single application in queue for user, it is likely set too"
-                + " low. skipping enforcement to allow at least one application"
-                + " to start");
-          } else{
-            application.updateAMContainerDiagnostics(AMState.INACTIVATED,
-                CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
-            LOG.debug("Not activating application {} for user: {} as"
-                + " userAmIfStarted: {} exceeds userAmLimit: {}",
-                applicationId, user, userAmIfStarted, userAMLimit);
-            continue;
-          }
-        }
-        user.activateApplication();
-        orderingPolicy.addSchedulableEntity(application);
-        application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
-
-        usageTracker.getQueueUsage().incAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        user.getResourceUsage().incAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
-        usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(),
-            application.getAMResource(partitionName));
-        usageTracker.getMetrics().setAMResouceLimitForUser(partitionName,
-            application.getUser(), userAMLimit);
-        fsApp.remove();
-        LOG.info("Application " + applicationId + " from user: " + application
-            .getUser() + " activated in queue: " + getQueuePath());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void addApplicationAttempt(FiCaSchedulerApp application,
-      User user) {
-    writeLock.lock();
-    try {
-      applicationAttemptMap.put(application.getApplicationAttemptId(),
-          application);
-
-      if (application.isRunnable()) {
-        runnableApps.add(application);
-        LOG.debug("Adding runnable application: {}",
-            application.getApplicationAttemptId());
-      } else {
-        nonRunnableApps.add(application);
-        LOG.info("Application attempt {} is not runnable,"
-            + " parallel limit reached", application.getApplicationAttemptId());
-        return;
-      }
-
-      // Accept
-      user.submitApplication();
-      getPendingAppsOrderingPolicy().addSchedulableEntity(application);
-
-      // Activate applications
-      if (Resources.greaterThan(resourceCalculator, lastClusterResource,
-          lastClusterResource, Resources.none())) {
-        activateApplications();
-      } else {
-        application.updateAMContainerDiagnostics(AMState.INACTIVATED,
-            CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
-        LOG.info("Skipping activateApplications for "
-            + application.getApplicationAttemptId()
-            + " since cluster resource is " + Resources.none());
-      }
-
-      LOG.info(
-          "Application added -" + " appId: " + application.getApplicationId()
-              + " user: " + application.getUser() + "," + " leaf-queue: "
-              + getQueuePath() + " #user-pending-applications: " + user
-              .getPendingApplications() + " #user-active-applications: " + user
-              .getActiveApplications() + " #queue-pending-applications: "
-              + getNumPendingApplications() + " #queue-active-applications: "
-              + getNumActiveApplications()
-              + " #queue-nonrunnable-applications: "
-              + getNumNonRunnableApps());
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void finishApplication(ApplicationId application, String user) {
-    // Inform the activeUsersManager
-    usersManager.deactivateApplication(user, application);
-
-    appFinished();
-
-    // Inform the parent queue
-    parent.finishApplication(application, user);
-  }
-
-  @Override
-  public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
-    // Careful! Locking order is important!
-    removeApplicationAttempt(application, application.getUser());
-    parent.finishApplicationAttempt(application, queue);
-  }
-
-  private void removeApplicationAttempt(
-      FiCaSchedulerApp application, String userName) {
-
-    writeLock.lock();
-    try {
-      // TODO, should use getUser, use this method just to avoid UT failure
-      // which is caused by wrong invoking order, will fix UT separately
-      User user = usersManager.getUserAndAddIfAbsent(userName);
-
-      boolean runnable = runnableApps.remove(application);
-      if (!runnable) {
-        // removeNonRunnableApp acquires the write lock again, which is fine
-        if (!removeNonRunnableApp(application)) {
-          LOG.error("Given app to remove " + application +
-              " does not exist in queue " + getQueuePath());
-        }
-      }
-
-      String partitionName = application.getAppAMNodePartitionName();
-      boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
-      if (!wasActive) {
-        pendingOrderingPolicy.removeSchedulableEntity(application);
-      } else{
-        usageTracker.getQueueUsage().decAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        user.getResourceUsage().decAMUsed(partitionName,
-            application.getAMResource(partitionName));
-        usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(),
-            application.getAMResource(partitionName));
-      }
-      applicationAttemptMap.remove(application.getApplicationAttemptId());
-
-      user.finishApplication(wasActive);
-      if (user.getTotalApplications() == 0) {
-        usersManager.removeUser(application.getUser());
-      }
-
-      // Check if we can activate more applications
-      activateApplications();
-
-      LOG.info(
-          "Application removed -" + " appId: " + application.getApplicationId()
-              + " user: " + application.getUser() + " queue: " + getQueuePath()
-              + " #user-pending-applications: " + user.getPendingApplications()
-              + " #user-active-applications: " + user.getActiveApplications()
-              + " #queue-pending-applications: " + getNumPendingApplications()
-              + " #queue-active-applications: " + getNumActiveApplications());
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private FiCaSchedulerApp getApplication(
-      ApplicationAttemptId applicationAttemptId) {
-    return applicationAttemptMap.get(applicationAttemptId);
-  }
-
-  private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
-    // Set preemption-allowed:
-    // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
-    if (!usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)
-        .equals(Resources.none())) {
-      limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
-          csContext.getClusterResource(), usageTracker.getQueueUsage().getUsed(nodePartition),
-          usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)));
-      return;
-    }
-
-    float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
-    float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
-    limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
-  }
-
-  private CSAssignment allocateFromReservedContainer(Resource clusterResource,
-      CandidateNodeSet<FiCaSchedulerNode> candidates,
-      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-
-    // Irrespective of Single / Multi Node Placement, the allocate from
-    // Reserved Container has to happen only for the single node which
-    // CapacityScheduler#allocateFromReservedContainer invokes with.
-    // Else In Multi Node Placement, there won't be any Allocation or
-    // Reserve of new containers when there is a RESERVED container on
-    // a node which is full.
-    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
-    if (node != null) {
-      RMContainer reservedContainer = node.getReservedContainer();
-      if (reservedContainer != null) {
-        FiCaSchedulerApp application = getApplication(
-            reservedContainer.getApplicationAttemptId());
-
-        if (null != application) {
-          ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-              node, SystemClock.getInstance().getTime(), application);
-          CSAssignment assignment = application.assignContainers(
-              clusterResource, candidates, currentResourceLimits,
-              schedulingMode, reservedContainer);
-          return assignment;
-        }
-      }
-    }
-
-    return null;
-  }
-
-  private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
-      String partition,
-      SchedulingMode schedulingMode) {
-    synchronized (userLimitsCache) {
-      long latestVersion = usersManager.getLatestVersionOfUsersState();
-
-      if (latestVersion != this.currentUserLimitCacheVersion) {
-        // User limits cache needs invalidating
-        this.currentUserLimitCacheVersion = latestVersion;
-        userLimitsCache.clear();
-
-        Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
-            uLCByPartition = new HashMap<>();
-        userLimitsCache.put(partition, uLCByPartition);
-
-        ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
-            new ConcurrentHashMap<>();
-        uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
-
-        return uLCBySchedulingMode;
-      }
-
-      // User limits cache does not need invalidating
-      Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
-          uLCByPartition = userLimitsCache.get(partition);
-      if (uLCByPartition == null) {
-        uLCByPartition = new HashMap<>();
-        userLimitsCache.put(partition, uLCByPartition);
-      }
-
-      ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
-          uLCByPartition.get(schedulingMode);
-      if (uLCBySchedulingMode == null) {
-        uLCBySchedulingMode = new ConcurrentHashMap<>();
-        uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
-      }
-
-      return uLCBySchedulingMode;
-    }
-  }
-
-  @Override
-  public CSAssignment assignContainers(Resource clusterResource,
-      CandidateNodeSet<FiCaSchedulerNode> candidates,
-      ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-    updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-    FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
-
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("assignContainers: partition=" + candidates.getPartition()
-          + " #applications=" + orderingPolicy.getNumSchedulableEntities());
-    }
-
-    setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
-
-    // Check for reserved resources, try to allocate reserved container first.
-    CSAssignment assignment = allocateFromReservedContainer(clusterResource,
-        candidates, currentResourceLimits, schedulingMode);
-    if (null != assignment) {
-      return assignment;
-    }
-
-    // if our queue cannot access this node, just return
-    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
-        && !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
-          ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-
-    // Check if this queue need more resource, simply skip allocation if this
-    // queue doesn't need more resources.
-    if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
-        schedulingMode)) {
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Skip this queue=" + getQueuePath()
-            + ", because it doesn't need more resource, schedulingMode="
-            + schedulingMode.name() + " node-partition=" + candidates
-            .getPartition());
-      }
-      ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-          parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
-          ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
-      return CSAssignment.NULL_ASSIGNMENT;
-    }
-
-    ConcurrentMap<String, CachedUserLimit> userLimits =
-        this.getUserLimitCache(candidates.getPartition(), schedulingMode);
-    boolean needAssignToQueueCheck = true;
-    IteratorSelector sel = new IteratorSelector();
-    sel.setPartition(candidates.getPartition());
-    for (Iterator<FiCaSchedulerApp> assignmentIterator =
-         orderingPolicy.getAssignmentIterator(sel);
-         assignmentIterator.hasNext(); ) {
-      FiCaSchedulerApp application = assignmentIterator.next();
-
-      ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
-          node, SystemClock.getInstance().getTime(), application);
-
-      // Check queue max-capacity limit
-      Resource appReserved = application.getCurrentReservation();
-      if (needAssignToQueueCheck) {
-        if (!super.canAssignToThisQueue(clusterResource,
-            candidates.getPartition(), currentResourceLimits, appReserved,
-            schedulingMode)) {
-          ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-              activitiesManager, node, application, application.getPriority(),
-              ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
-          ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-              parent.getQueuePath(), getQueuePath(),
-              ActivityState.REJECTED,
-              ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
-          return CSAssignment.NULL_ASSIGNMENT;
-        }
-        // If there was no reservation and canAssignToThisQueue returned
-        // true, there is no reason to check further.
-        if (!this.reservationsContinueLooking
-            || appReserved.equals(Resources.none())) {
-          needAssignToQueueCheck = false;
-        }
-      }
-
-      CachedUserLimit cul = userLimits.get(application.getUser());
-      Resource cachedUserLimit = null;
-      if (cul != null) {
-        cachedUserLimit = cul.userLimit;
-      }
-      Resource userLimit = computeUserLimitAndSetHeadroom(application,
-          clusterResource, candidates.getPartition(), schedulingMode,
-          cachedUserLimit);
-      if (cul == null) {
-        cul = new CachedUserLimit(userLimit);
-        CachedUserLimit retVal =
-            userLimits.putIfAbsent(application.getUser(), cul);
-        if (retVal != null) {
-          // another thread updated the user limit cache before us
-          cul = retVal;
-          userLimit = cul.userLimit;
-        }
-      }
-      // Check user limit
-      boolean userAssignable = true;
-      if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
-        userAssignable = false;
-      } else {
-        userAssignable = canAssignToUser(clusterResource, application.getUser(),
-            userLimit, application, candidates.getPartition(),
-            currentResourceLimits);
-        if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
-          cul.canAssign = false;
-          cul.reservation = appReserved;
-        }
-      }
-      if (!userAssignable) {
-        application.updateAMContainerDiagnostics(AMState.ACTIVATED,
-            "User capacity has reached its maximum limit.");
-        ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
-            activitiesManager, node, application, application.getPriority(),
-            ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
-        continue;
-      }
-
-      // Try to schedule
-      assignment = application.assignContainers(clusterResource,
-          candidates, currentResourceLimits, schedulingMode, null);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("post-assignContainers for application " + application
-            .getApplicationId());
-        application.showRequests();
-      }
-
-      // Did we schedule or reserve a container?
-      Resource assigned = assignment.getResource();
-
-      if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
-          Resources.none())) {
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(),
-            ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
-        return assignment;
-      } else if (assignment.getSkippedType()
-          == CSAssignment.SkippedType.OTHER) {
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-        application.updateNodeInfoForAMDiagnostics(node);
-      } else if (assignment.getSkippedType()
-          == CSAssignment.SkippedType.QUEUE_LIMIT) {
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
-            () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
-                + " from " + application.getApplicationId());
-        return assignment;
-      } else{
-        // If we don't allocate anything, and it is not skipped by application,
-        // we will return to respect FIFO of applications
-        ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-            parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
-            ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
-        ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
-            activitiesManager, application.getApplicationId(),
-            ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
-        return CSAssignment.NULL_ASSIGNMENT;
-      }
-    }
-    ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
-        parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
-        ActivityDiagnosticConstant.EMPTY);
-
-    return CSAssignment.NULL_ASSIGNMENT;
-  }
-
-  @Override
-  public boolean accept(Resource cluster,
-      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
-    ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
-        request.getFirstAllocatedOrReservedContainer();
-    SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
-        allocation.getAllocatedOrReservedContainer();
-
-    // Do not check limits when allocation from a reserved container
-    if (allocation.getAllocateFromReservedContainer() == null) {
-      readLock.lock();
-      try {
-        FiCaSchedulerApp app =
-            schedulerContainer.getSchedulerApplicationAttempt();
-        String username = app.getUser();
-        String p = schedulerContainer.getNodePartition();
-
-        // check user-limit
-        Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
-            allocation.getSchedulingMode(), null);
-
-        // Deduct resources that we can release
-        User user = getUser(username);
-        if (user == null) {
-          LOG.debug("User {} has been removed!", username);
-          return false;
-        }
-        Resource usedResource = Resources.clone(user.getUsed(p));
-        Resources.subtractFrom(usedResource,
-            request.getTotalReleasedResource());
-
-        if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
-            userLimit)) {
-          LOG.debug("Used resource={} exceeded user-limit={}",
-              usedResource, userLimit);
-          return false;
-        }
-      } finally {
-        readLock.unlock();
-      }
-    }
-
-    return super.accept(cluster, request);
-  }
-
-  private void internalReleaseContainer(Resource clusterResource,
-      SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
-    RMContainer rmContainer = schedulerContainer.getRmContainer();
-
-    LeafQueue targetLeafQueue =
-        schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
-
-    if (targetLeafQueue == this) {
-      // When trying to preempt containers from the same queue
-      if (rmContainer.getState() == RMContainerState.RESERVED) {
-        // For other reserved containers
-        // This is a reservation exchange, complete previous reserved container
-        completedContainer(clusterResource,
-            schedulerContainer.getSchedulerApplicationAttempt(),
-            schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
-                .createAbnormalContainerStatus(rmContainer.getContainerId(),
-                    SchedulerUtils.UNRESERVED_CONTAINER),
-            RMContainerEventType.RELEASED, null, false);
-      }
-    } else{
-      // When trying to preempt containers from different queue -- this
-      // is for lazy preemption feature (kill preemption candidate in scheduling
-      // cycle).
-      targetLeafQueue.completedContainer(clusterResource,
-          schedulerContainer.getSchedulerApplicationAttempt(),
-          schedulerContainer.getSchedulerNode(),
-          schedulerContainer.getRmContainer(), SchedulerUtils
-              .createPreemptedContainerStatus(rmContainer.getContainerId(),
-                  SchedulerUtils.PREEMPTED_CONTAINER),
-          RMContainerEventType.KILL, null, false);
-    }
-  }
-
-  private void releaseContainers(Resource clusterResource,
-      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
-    for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
-        .getContainersToRelease()) {
-      internalReleaseContainer(clusterResource, c);
-    }
-
-    // Handle container reservation looking, or lazy preemption case:
-    if (null != request.getContainersToAllocate() && !request
-        .getContainersToAllocate().isEmpty()) {
-      for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
-          .getContainersToAllocate()) {
-        if (null != context.getToRelease()) {
-          for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
-              .getToRelease()) {
-            internalReleaseContainer(clusterResource, c);
-          }
-        }
-      }
-    }
-  }
-
-  public void apply(Resource cluster,
-      ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
-    // Do we need to call parent queue's apply?
-    boolean applyToParentQueue = false;
-
-    releaseContainers(cluster, request);
-
-    writeLock.lock();
-    try {
-      if (request.anythingAllocatedOrReserved()) {
-        ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
-            allocation = request.getFirstAllocatedOrReservedContainer();
-        SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
-            schedulerContainer = allocation.getAllocatedOrReservedContainer();
-
-        // Do not modify queue when allocation from reserved container
-        if (allocation.getAllocateFromReservedContainer() == null) {
-          // Only invoke apply() of ParentQueue when new allocation /
-          // reservation happen.
-          applyToParentQueue = true;
-          // Book-keeping
-          // Note: Update headroom to account for current allocation too...
-          allocateResource(cluster,
-              schedulerContainer.getSchedulerApplicationAttempt(),
-              allocation.getAllocatedOrReservedResource(),
-              schedulerContainer.getNodePartition(),
-              schedulerContainer.getRmContainer());
-          orderingPolicy.containerAllocated(
-              schedulerContainer.getSchedulerApplicationAttempt(),
-              schedulerContainer.getRmContainer());
-        }
-
-        // Update reserved resource
-        if (Resources.greaterThan(resourceCalculator, cluster,
-            request.getTotalReservedResource(), Resources.none())) {
-          incReservedResource(schedulerContainer.getNodePartition(),
-              request.getTotalReservedResource());
-        }
-      }
-    } finally {
-      writeLock.unlock();
-    }
-
-    if (parent != null && applyToParentQueue) {
-      parent.apply(cluster, request);
-    }
-  }
-
-
-  protected Resource getHeadroom(User user, Resource queueCurrentLimit,
-      Resource clusterResource, FiCaSchedulerApp application) {
-    return getHeadroom(user, queueCurrentLimit, clusterResource, application,
-        RMNodeLabelsManager.NO_LABEL);
-  }
-
-  protected Resource getHeadroom(User user, Resource queueCurrentLimit,
-      Resource clusterResource, FiCaSchedulerApp application,
-      String partition) {
-    return getHeadroom(user, queueCurrentLimit, clusterResource,
-        getResourceLimitForActiveUsers(application.getUser(), clusterResource,
-            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-        partition);
-  }
-
-  private Resource getHeadroom(User user,
-      Resource currentPartitionResourceLimit, Resource clusterResource,
-      Resource userLimitResource, String partition) {
-    /** 
-     * Headroom is:
-     *    min(
-     *        min(userLimit, queueMaxCap) - userConsumed,
-     *        queueMaxCap - queueUsedResources
-     *       )
-     * 
-     * ( which can be expressed as, 
-     *  min (userLimit - userConsumed, queuMaxCap - userConsumed, 
-     *    queueMaxCap - queueUsedResources)
-     *  )
-     *
-     * given that queueUsedResources >= userConsumed, this simplifies to
-     *
-     * >> min (userlimit - userConsumed,   queueMaxCap - queueUsedResources) << 
-     *
-     * sum of queue max capacities of multiple queue's will be greater than the
-     * actual capacity of a given partition, hence we need to ensure that the
-     * headroom is not greater than the available resource for a given partition
-     *
-     * headroom = min (unused resourcelimit of a label, calculated headroom )
-     */
-    currentPartitionResourceLimit =
-        partition.equals(RMNodeLabelsManager.NO_LABEL)
-            ? currentPartitionResourceLimit
-            : getQueueMaxResource(partition);
-
-    Resource headroom = Resources.componentwiseMin(
-        Resources.subtractNonNegative(userLimitResource,
-            user.getUsed(partition)),
-        Resources.subtractNonNegative(currentPartitionResourceLimit,
-            usageTracker.getQueueUsage().getUsed(partition)));
-    // Normalize it before return
-    headroom =
-        Resources.roundDown(resourceCalculator, headroom,
-            queueAllocationSettings.getMinimumAllocation());
-
-    //headroom = min (unused resourcelimit of a label, calculated headroom )
-    Resource clusterPartitionResource =
-        labelManager.getResourceByLabel(partition, clusterResource);
-    Resource clusterFreePartitionResource =
-        Resources.subtract(clusterPartitionResource,
-            csContext.getClusterResourceUsage().getUsed(partition));
-    headroom = Resources.min(resourceCalculator, clusterPartitionResource,
-        clusterFreePartitionResource, headroom);
-    return headroom;
-  }
-  
-  private void setQueueResourceLimitsInfo(
-      Resource clusterResource) {
-    synchronized (queueResourceLimitsInfo) {
-      queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
-          .getLimit());
-      queueResourceLimitsInfo.setClusterResource(clusterResource);
-    }
-  }
-
-  // It doesn't necessarily to hold application's lock here.
-  @Lock({LeafQueue.class})
-  Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode, Resource userLimit) {
-    String user = application.getUser();
-    User queueUser = getUser(user);
-    if (queueUser == null) {
-      LOG.debug("User {} has been removed!", user);
-      return Resources.none();
-    }
-
-    // Compute user limit respect requested labels,
-    // TODO, need consider headroom respect labels also
-    if (userLimit == null) {
-      userLimit = getResourceLimitForActiveUsers(application.getUser(),
-          clusterResource, nodePartition, schedulingMode);
-    }
-    setQueueResourceLimitsInfo(clusterResource);
-
-    Resource headroom =
-        usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
-        getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
-            clusterResource, userLimit, nodePartition);
-    
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
-          + userLimit + " queueMaxAvailRes="
-          + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
-          + queueUser.getUsed() + " partition="
-          + nodePartition);
-    }
-    
-    CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
-      queueUser, this, application, queueResourceLimitsInfo);
-    
-    application.setHeadroomProvider(headroomProvider);
-
-    usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom);
-    
-    return userLimit;
-  }
-  
-  @Lock(NoLock.class)
-  public int getNodeLocalityDelay() {
-    return nodeLocalityDelay;
-  }
-
-  @Lock(NoLock.class)
-  public int getRackLocalityAdditionalDelay() {
-    return rackLocalityAdditionalDelay;
-  }
-
-  @Lock(NoLock.class)
-  public boolean getRackLocalityFullReset() {
-    return rackLocalityFullReset;
-  }
-
-  /**
-   *
-   * @param userName
-   *          Name of user who has submitted one/more app to given queue.
-   * @param clusterResource
-   *          total cluster resource
-   * @param nodePartition
-   *          partition name
-   * @param schedulingMode
-   *          scheduling mode
-   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
-   * @return Computed User Limit
-   */
-  public Resource getResourceLimitForActiveUsers(String userName,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode) {
-    return usersManager.getComputedResourceLimitForActiveUsers(userName,
-        clusterResource, nodePartition, schedulingMode);
-  }
-
-  /**
-   *
-   * @param userName
-   *          Name of user who has submitted one/more app to given queue.
-   * @param clusterResource
-   *          total cluster resource
-   * @param nodePartition
-   *          partition name
-   * @param schedulingMode
-   *          scheduling mode
-   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
-   * @return Computed User Limit
-   */
-  public Resource getResourceLimitForAllUsers(String userName,
-      Resource clusterResource, String nodePartition,
-      SchedulingMode schedulingMode) {
-    return usersManager.getComputedResourceLimitForAllUsers(userName,
-        clusterResource, nodePartition, schedulingMode);
-  }
-
-  @Private
-  protected boolean canAssignToUser(Resource clusterResource,
-      String userName, Resource limit, FiCaSchedulerApp application,
-      String nodePartition, ResourceLimits currentResourceLimits) {
-
-    readLock.lock();
-    try {
-      User user = getUser(userName);
-      if (user == null) {
-        LOG.debug("User {} has been removed!", userName);
-        return false;
-      }
-
-      currentResourceLimits.setAmountNeededUnreserve(Resources.none());
-
-      // Note: We aren't considering the current request since there is a fixed
-      // overhead of the AM, but it's a > check, not a >= check, so...
-      if (Resources.greaterThan(resourceCalculator, clusterResource,
-          user.getUsed(nodePartition), limit)) {
-        // if enabled, check to see if could we potentially use this node instead
-        // of a reserved node if the application has reserved containers
-        if (this.reservationsContinueLooking) {
-          if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
-              Resources.subtract(user.getUsed(),
-                  application.getCurrentReservation()), limit)) {
-
-            if (LOG.isDebugEnabled()) {
-              LOG.debug("User " + userName + " in queue " + getQueuePath()
-                  + " will exceed limit based on reservations - "
-                  + " consumed: " + user.getUsed() + " reserved: " + application
-                  .getCurrentReservation() + " limit: " + limit);
-            }
-            Resource amountNeededToUnreserve = Resources.subtract(
-                user.getUsed(nodePartition), limit);
-            // we can only acquire a new container if we unreserve first to
-            // respect user-limit
-            currentResourceLimits.setAmountNeededUnreserve(
-                amountNeededToUnreserve);
-            return true;
-          }
-        }
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("User " + userName + " in queue " + getQueuePath()
-              + " will exceed limit - " + " consumed: " + user
-              .getUsed(nodePartition) + " limit: " + limit);
-        }
-        return false;
-      }
-      return true;
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  @Override
-  protected void setDynamicQueueProperties(
-      CapacitySchedulerConfiguration configuration) {
-    // set to -1, to disable it
-    configuration.setUserLimitFactor(getQueuePath(), -1);
-    // Set Max AM percentage to a higher value
-    configuration.setMaximumApplicationMasterResourcePerQueuePercent(
-        getQueuePath(), 1f);
-    super.setDynamicQueueProperties(configuration);
-  }
-
-  private void updateSchedulerHealthForCompletedContainer(
-      RMContainer rmContainer, ContainerStatus containerStatus) {
-    // Update SchedulerHealth for released / preempted container
-    SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
-    if (null == schedulerHealth) {
-      // Only do update if we have schedulerHealth
-      return;
-    }
-
-    if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
-      schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
-          rmContainer.getContainerId(), getQueuePath());
-      schedulerHealth.updateSchedulerPreemptionCounts(1);
-    } else {
-      schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
-          rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
-          getQueuePath());
-    }
-  }
-
-  /**
-   * Recalculate QueueUsage Ratio.
-   *
-   * @param clusterResource
-   *          Total Cluster Resource
-   * @param nodePartition
-   *          Partition
-   */
-  public void recalculateQueueUsageRatio(Resource clusterResource,
-      String nodePartition) {
-    writeLock.lock();
-    try {
-      ResourceUsage queueResourceUsage = getQueueResourceUsage();
-
-      if (nodePartition == null) {
-        for (String partition : Sets.union(
-            getQueueCapacities().getNodePartitionsSet(),
-            queueResourceUsage.getNodePartitionsSet())) {
-          usersManager.updateUsageRatio(partition, clusterResource);
-        }
-      } else {
-        usersManager.updateUsageRatio(nodePartition, clusterResource);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void completedContainer(Resource clusterResource, 
-      FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
-      ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
-      boolean sortQueues) {
-    // Update SchedulerHealth for released / preempted container
-    updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
-
-    if (application != null) {
-      boolean removed = false;
-
-      // Careful! Locking order is important!
-      writeLock.lock();
-      try {
-        Container container = rmContainer.getContainer();
-
-        // Inform the application & the node
-        // Note: It's safe to assume that all state changes to RMContainer
-        // happen under scheduler's lock...
-        // So, this is, in effect, a transaction across application & node
-        if (rmContainer.getState() == RMContainerState.RESERVED) {
-          removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
-              node, rmContainer);
-        } else{
-          removed = application.containerCompleted(rmContainer, containerStatus,
-              event, node.getPartition());
-
-          node.releaseContainer(rmContainer.getContainerId(), false);
-        }
-
-        // Book-keeping
-        if (removed) {
-
-          // Inform the ordering policy
-          orderingPolicy.containerReleased(application, rmContainer);
-
-          releaseResource(clusterResource, application, container.getResource(),
-              node.getPartition(), rmContainer);
-        }
-      } finally {
-        writeLock.unlock();
-      }
-
-
-      if (removed) {
-        // Inform the parent queue _outside_ of the leaf-queue lock
-        parent.completedContainer(clusterResource, application, node,
-          rmContainer, null, event, this, sortQueues);
-      }
-    }
-
-    // Notify PreemptionManager
-    csContext.getPreemptionManager().removeKillableContainer(
-        new KillableContainer(
-            rmContainer,
-            node.getPartition(),
-            getQueuePath()));
-
-    // Update preemption metrics if exit status is PREEMPTED
-    if (containerStatus != null
-        && ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
-      updateQueuePreemptionMetrics(rmContainer);
-    }
-  }
-
-  void allocateResource(Resource clusterResource,
-      SchedulerApplicationAttempt application, Resource resource,
-      String nodePartition, RMContainer rmContainer) {
-    writeLock.lock();
-    try {
-      super.allocateResource(clusterResource, resource, nodePartition);
-
-      // handle ignore exclusivity container
-      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
-          RMNodeLabelsManager.NO_LABEL)) {
-        TreeSet<RMContainer> rmContainers = null;
-        if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
-            nodePartition))) {
-          rmContainers = new TreeSet<>();
-          ignorePartitionExclusivityRMContainers.put(nodePartition,
-              rmContainers);
-        }
-        rmContainers.add(rmContainer);
-      }
-
-      // Update user metrics
-      String userName = application.getUser();
-
-      // Increment user's resource usage.
-      User user = usersManager.updateUserResourceUsage(userName, resource,
-          nodePartition, true);
-
-      Resource partitionHeadroom = Resources.createResource(0, 0);
-      if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
-        partitionHeadroom = getHeadroom(user,
-            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
-            getResourceLimitForActiveUsers(userName, clusterResource,
-                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-            nodePartition);
-      }
-      usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
-          partitionHeadroom);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueuePath() + " user=" + userName + " used="
-            + usageTracker.getQueueUsage().getUsed(nodePartition) + " numContainers="
-            + usageTracker.getNumContainers() + " headroom = " + application.getHeadroom()
-            + " user-resources=" + user.getUsed());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  void releaseResource(Resource clusterResource,
-      FiCaSchedulerApp application, Resource resource, String nodePartition,
-      RMContainer rmContainer) {
-    writeLock.lock();
-    try {
-      super.releaseResource(clusterResource, resource, nodePartition);
-
-      // handle ignore exclusivity container
-      if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
-          RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
-          RMNodeLabelsManager.NO_LABEL)) {
-        if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
-          Set<RMContainer> rmContainers =
-              ignorePartitionExclusivityRMContainers.get(nodePartition);
-          rmContainers.remove(rmContainer);
-          if (rmContainers.isEmpty()) {
-            ignorePartitionExclusivityRMContainers.remove(nodePartition);
-          }
-        }
-      }
-
-      // Update user metrics
-      String userName = application.getUser();
-      User user = usersManager.updateUserResourceUsage(userName, resource,
-          nodePartition, false);
-
-      Resource partitionHeadroom = Resources.createResource(0, 0);
-      if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
-        partitionHeadroom = getHeadroom(user,
-            cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
-            getResourceLimitForActiveUsers(userName, clusterResource,
-                nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-            nodePartition);
-      }
-      usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
-          partitionHeadroom);
-
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(
-            getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers="
-            + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
-            + user.getUsed());
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-  
-  private void updateCurrentResourceLimits(
-      ResourceLimits currentResourceLimits, Resource clusterResource) {
-    // TODO: need consider non-empty node labels when resource limits supports
-    // node labels
-    // Even if ParentQueue will set limits respect child's max queue capacity,
-    // but when allocating reserved container, CapacityScheduler doesn't do
-    // this. So need cap limits by queue's max capacity here.
-    this.cachedResourceLimitsForHeadroom =
-        new ResourceLimits(currentResourceLimits.getLimit());
-    Resource queueMaxResource = getEffectiveMaxCapacityDown(
-        RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
-    this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
-        resourceCalculator, clusterResource, queueMaxResource,
-        currentResourceLimits.getLimit()));
-  }
-
-  @Override
-  public void updateClusterResource(Resource clusterResource,
-      ResourceLimits currentResourceLimits) {
-    writeLock.lock();
-    try {
-      lastClusterResource = clusterResource;
-
-      updateAbsoluteCapacities();
-
-      super.updateEffectiveResources(clusterResource);
-
-      // Update maximum applications for the queue and for users
-      updateMaximumApplications(csContext.getConfiguration());
-
-      updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-
-      // Update headroom info based on new cluster resource value
-      // absoluteMaxCapacity now,  will be replaced with absoluteMaxAvailCapacity
-      // during allocation
-      setQueueResourceLimitsInfo(clusterResource);
-
-      // Update user consumedRatios
-      recalculateQueueUsageRatio(clusterResource, null);
-
-      // Update metrics
-      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
-          this, labelManager, null);
-      // Update configured capacity/max-capacity for default partition only
-      CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
-          labelManager.getResourceByLabel(null, clusterResource),
-          RMNodeLabelsManager.NO_LABEL, this);
-
-      // queue metrics are updated, more resource may be available
-      // activate the pending applications if possible
-      activateApplications();
-
-      // In case of any resource change, invalidate recalculateULCount to clear
-      // the computed user-limit.
-      usersManager.userLimitNeedsRecompute();
-
-      // Update application properties
-      for (FiCaSchedulerApp application : orderingPolicy
-          .getSchedulableEntities()) {
-        computeUserLimitAndSetHeadroom(application, clusterResource,
-            RMNodeLabelsManager.NO_LABEL,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public void incUsedResource(String nodeLabel, Resource resourceToInc,
-      SchedulerApplicationAttempt application) {
-    usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
-        nodeLabel, true);
-    super.incUsedResource(nodeLabel, resourceToInc, application);
-  }
-
-  @Override
-  public void decUsedResource(String nodeLabel, Resource resourceToDec,
-      SchedulerApplicationAttempt application) {
-    usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
-        nodeLabel, false);
-    super.decUsedResource(nodeLabel, resourceToDec, application);
-  }
-
-  public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
-      SchedulerApplicationAttempt application) {
-    User user = getUser(application.getUser());
-    if (user == null) {
-      return;
-    }
-
-    user.getResourceUsage().incAMUsed(nodeLabel,
-        resourceToInc);
-    // ResourceUsage has its own lock, no addition lock needs here.
-    usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
-  }
-
-  public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
-      SchedulerApplicationAttempt application) {
-    User user = getUser(application.getUser());
-    if (user == null) {
-      return;
-    }
-
-    user.getResourceUsage().decAMUsed(nodeLabel,
-        resourceToDec);
-    // ResourceUsage has its own lock, no addition lock needs here.
-    usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
-  }
-
-  @Override
-  public void recoverContainer(Resource clusterResource,
-      SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
-    if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
-      return;
-    }
-    if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
-      return;
-    }
-    // Careful! Locking order is important!
-    writeLock.lock();
-    try {
-      FiCaSchedulerNode node = csContext.getNode(
-          rmContainer.getContainer().getNodeId());
-      allocateResource(clusterResource, attempt,
-          rmContainer.getContainer().getResource(), node.getPartition(),
-          rmContainer);
-    } finally {
-      writeLock.unlock();
-    }
-
-    parent.recoverContainer(clusterResource, attempt, rmContainer);
-  }
-
-  /**
-   * Obtain (read-only) collection of pending applications.
-   */
-  public Collection<FiCaSchedulerApp> getPendingApplications() {
-    return Collections.unmodifiableCollection(pendingOrderingPolicy
-        .getSchedulableEntities());
-  }
-
-  /**
-   * Obtain (read-only) collection of active applications.
-   */
-  public Collection<FiCaSchedulerApp> getApplications() {
-    return Collections.unmodifiableCollection(orderingPolicy
-        .getSchedulableEntities());
-  }
-
-  /**
-   * Obtain (read-only) collection of all applications.
-   */
-  public Collection<FiCaSchedulerApp> getAllApplications() {
-    Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
-        pendingOrderingPolicy.getSchedulableEntities());
-    apps.addAll(orderingPolicy.getSchedulableEntities());
-
-    return Collections.unmodifiableCollection(apps);
-  }
-
-  /**
-   * Get total pending resource considering user limit for the leaf queue. This
-   * will be used for calculating pending resources in the preemption monitor.
-   *
-   * Consider the headroom for each user in the queue.
-   * Total pending for the queue =
-   * sum(for each user(min((user's headroom), sum(user's pending requests))))
-   * NOTE:
-
-   * @param clusterResources clusterResource
-   * @param partition node partition
-   * @param deductReservedFromPending When a container is reserved in CS,
-   *                                  pending resource will not be deducted.
-   *                                  This could lead to double accounting when
-   *                                  doing preemption:
-   *                                  In normal cases, we should deduct reserved
-   *                                  resource from pending to avoid
-   *                                  excessive preemption.
-   * @return Total pending resource considering user limit
-   */
-  public Resource getTotalPendingResourcesConsideringUserLimit(
-      Resource clusterResources, String partition,
-      boolean deductReservedFromPending) {
-    readLock.lock();
-    try {
-      Map<String, Resource> userNameToHeadroom =
-          new HashMap<>();
-      Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
-      for (FiCaSchedulerApp app : getApplications()) {
-        String userName = app.getUser();
-        if (!userNameToHeadroom.containsKey(userName)) {
-          User user = getUsersManager().getUserAndAddIfAbsent(userName);
-          Resource headroom = Resources.subtract(
-              getResourceLimitForActiveUsers(app.getUser(), clusterResources,
-                  partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
-              user.getUsed(partition));
-          // Make sure headroom is not negative.
-          headroom = Resources.componentwiseMax(headroom, Resources.none());
-          userNameToHeadroom.put(userName, headroom);
-        }
-
-        // Check if we need to deduct reserved from pending
-        Resource pending = app.getAppAttemptResourceUsage().getPending(
-            partition);
-        if (deductReservedFromPending) {
-          pending = Resources.subtract(pending,
-              app.getAppAttemptResourceUsage().getReserved(partition));
-        }
-        pending = Resources.componentwiseMax(pending, Resources.none());
-
-        Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
-            userNameToHeadroom.get(userName), pending);
-        Resources.addTo(totalPendingConsideringUserLimit,
-            minpendingConsideringUserLimit);
-        Resources.subtractFrom(userNameToHeadroom.get(userName),
-            minpendingConsideringUserLimit);
-      }
-      return totalPendingConsideringUserLimit;
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  @Override
-  public void collectSchedulerApplications(
-      Collection<ApplicationAttemptId> apps) {
-    readLock.lock();
-    try {
-      for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
-          .getSchedulableEntities()) {
-        apps.add(pendingApp.getApplicationAttemptId());
-      }
-      for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) {
-        apps.add(app.getApplicationAttemptId());
-      }
-    } finally {
-      readLock.unlock();
-    }
-
-  }
-
-  @Override
-  public void attachContainer(Resource clusterResource,
-      FiCaSchedulerApp application, RMContainer rmContainer) {
-    if (application != null && rmContainer != null
-        && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-      FiCaSchedulerNode node =
-          csContext.getNode(rmContainer.getContainer().getNodeId());
-      allocateResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition(), rmContainer);
-      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
-          + " containerState="+ rmContainer.getState()
-          + " resource=" + rmContainer.getContainer().getResource()
-          + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
-          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
-          + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
-      // Inform the parent queue
-      parent.attachContainer(clusterResource, application, rmContainer);
-    }
-  }
-
-  @Override
-  public void detachContainer(Resource clusterResource,
-      FiCaSchedulerApp application, RMContainer rmContainer) {
-    if (application != null && rmContainer != null
-          && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
-      FiCaSchedulerNode node =
-          csContext.getNode(rmContainer.getContainer().getNodeId());
-      releaseResource(clusterResource, application, rmContainer.getContainer()
-          .getResource(), node.getPartition(), rmContainer);
-      LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
-          + " containerState="+ rmContainer.getState()
-          + " resource=" + rmContainer.getContainer().getResource()
-          + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
-          + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
-          + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
-      // Inform the parent queue
-      parent.detachContainer(clusterResource, application, rmContainer);
-    }
-  }
-  
-  /**
-   * @return all ignored partition exclusivity RMContainers in the LeafQueue,
-   *         this will be used by preemption policy.
-   */
-  public Map<String, TreeSet<RMContainer>>
-      getIgnoreExclusivityRMContainers() {
-    Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
-
-    readLock.lock();
-    try {
-      for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
-          .entrySet()) {
-        clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
-      }
-
-      return clonedMap;
-
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  public void setCapacity(float capacity) {
-    queueCapacities.setCapacity(capacity);
-  }
-
-  public void setCapacity(String nodeLabel, float capacity) {
-    queueCapacities.setCapacity(nodeLabel, capacity);
-  }
-
-  public void setAbsoluteCapacity(float absoluteCapacity) {
-    queueCapacities.setAbsoluteCapacity(absoluteCapacity);
-  }
-
-  public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) {
-    queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity);
-  }
-
-  public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
-    this.maxApplicationsPerUser = maxApplicationsPerUser;
-  }
-
-  public void setMaxApplications(int maxApplications) {
-    this.maxApplications = maxApplications;
-  }
-
-  public void setMaxAMResourcePerQueuePercent(
-      float maxAMResourcePerQueuePercent) {
-    this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
-  }
-
-  public OrderingPolicy<FiCaSchedulerApp>
-      getOrderingPolicy() {
-    return orderingPolicy;
-  }
-  
-  void setOrderingPolicy(
-      OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
-    writeLock.lock();
-    try {
-      if (null != this.orderingPolicy) {
-        orderingPolicy.addAllSchedulableEntities(
-            this.orderingPolicy.getSchedulableEntities());
-      }
-      this.orderingPolicy = orderingPolicy;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  @Override
-  public Priority getDefaultApplicationPriority() {
-    return defaultAppPriorityPerQueue;
-  }
-
-  public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
-      Priority newAppPriority) {
-    writeLock.lock();
-    try {
-      FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
-      boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
-      if (!isActive) {
-        pendingOrderingPolicy.removeSchedulableEntity(attempt);
-      }
-      // Update new priority in SchedulerApplication
-      attempt.setPriority(newAppPriority);
-
-      if (isActive) {
-        orderingPolicy.addSchedulableEntity(attempt);
-      } else {
-        pendingOrderingPolicy.addSchedulableEntity(attempt);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  public OrderingPolicy<FiCaSchedulerApp>
-      getPendingAppsOrderingPolicy() {
-    return pendingOrderingPolicy;
-  }
-
-  /*
-   * Holds shared values used by all applications in
-   * the queue to calculate headroom on demand
-   */
-  static class QueueResourceLimitsInfo {
-    private Resource queueCurrentLimit;
-    private Resource clusterResource;
-    
-    public void setQueueCurrentLimit(Resource currentLimit) {
-      this.queueCurrentLimit = currentLimit;
-    }
-    
-    public Resource getQueueCurrentLimit() {
-      return queueCurrentLimit;
-    }
-    
-    public void setClusterResource(Resource clusterResource) {
-      this.clusterResource = clusterResource;
-    }
-    
-    public Resource getClusterResource() {
-      return clusterResource;
-    }
-  }
-
-  @Override
-  public void stopQueue() {
-    writeLock.lock();
-    try {
-      if (getNumApplications() > 0) {
-        updateQueueState(QueueState.DRAINING);
-      } else {
-        updateQueueState(QueueState.STOPPED);
-      }
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
-    int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());
-
-    int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
-    int maxSystemApps = conf.getMaximumSystemApplications();
-    int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
-        Math.min(maxDefaultPerQueueApps, maxSystemApps)
-        : maxSystemApps;
-
-    String maxLabel = RMNodeLabelsManager.NO_LABEL;
-    if (maxAppsForQueue < 0) {
-      if (maxDefaultPerQueueApps > 0 && this.capacityConfigType
-          != CapacityConfigType.ABSOLUTE_RESOURCE) {
-        maxAppsForQueue = baseMaxApplications;
-      } else {
-        for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
-          int maxApplicationsByLabel = (int) (baseMaxApplications
-              * queueCapacities.getAbsoluteCapacity(label));
-          if (maxApplicationsByLabel > maxAppsForQueue) {
-            maxAppsForQueue = maxApplicationsByLabel;
-            maxLabel = label;
-          }
-        }
-      }
-    }
-
-    setMaxApplications(maxAppsForQueue);
-
-    updateMaxAppsPerUser();
-
-    LOG.info("LeafQueue:" + getQueuePath() +
-        "update max app related, maxApplications="
-        + maxAppsForQueue + ", maxApplicationsPerUser="
-        + maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
-        .getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
-        .getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
-        .getMaximumCapacity(maxLabel));
-  }
-
-  private void updateMaxAppsPerUser() {
-    int maxAppsPerUser = maxApplications;
-    if (getUsersManager().getUserLimitFactor() != -1) {
-      int maxApplicationsWithUserLimits = (int) (maxApplications
-          * (getUsersManager().getUserLimit() / 100.0f)
-          * getUsersManager().getUserLimitFactor());
-      maxAppsPerUser = Math.min(maxApplications,
-          maxApplicationsWithUserLimits);
-    }
-
-    setMaxApplicationsPerUser(maxAppsPerUser);
-  }
-
-  /**
-   * Get all valid users in this queue.
-   * @return user list
-   */
-  public Set<String> getAllUsers() {
-    return this.getUsersManager().getUsers().keySet();
-  }
-
-  static class CachedUserLimit {
-    final Resource userLimit;
-    volatile boolean canAssign = true;
-    volatile Resource reservation = Resources.none();
-
-    CachedUserLimit(Resource userLimit) {
-      this.userLimit = userLimit;
-    }
-  }
-
-  private void updateQueuePreemptionMetrics(RMContainer rmc) {
-    final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
-    final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
-    CSQueueMetrics metrics = usageTracker.getMetrics();
-    Resource containerResource = rmc.getAllocatedResource();
-    metrics.preemptContainer();
-    long mbSeconds = (containerResource.getMemorySize() * usedMillis)
-        / DateUtils.MILLIS_PER_SECOND;
-    long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
-        / DateUtils.MILLIS_PER_SECOND;
-    metrics.updatePreemptedMemoryMBSeconds(mbSeconds);
-    metrics.updatePreemptedVcoreSeconds(vcSeconds);
-    metrics.updatePreemptedResources(containerResource);
-    metrics.updatePreemptedSecondsForCustomResources(containerResource,
-        usedSeconds);
-    metrics.updatePreemptedForCustomResources(containerResource);
-  }
-
-  @Override
-  int getNumRunnableApps() {
-    readLock.lock();
-    try {
-      return runnableApps.size();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  int getNumNonRunnableApps() {
-    readLock.lock();
-    try {
-      return nonRunnableApps.size();
-    } finally {
-      readLock.unlock();
-    }
-  }
-
-  boolean removeNonRunnableApp(FiCaSchedulerApp app) {
-    writeLock.lock();
-    try {
-      return nonRunnableApps.remove(app);
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
-    List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
-    readLock.lock();
-    try {
-      appsToReturn.addAll(nonRunnableApps);
-    } finally {
-      readLock.unlock();
-    }
-    return appsToReturn;
-  }
-
-  @Override
-  public boolean isEligibleForAutoDeletion() {
-    return isDynamicQueue() && getNumApplications() == 0
-        && csContext.getConfiguration().
-        isAutoExpiredDeletionEnabled(this.getQueuePath());
+    LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
   }
 }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
index 6e7325c..ddfb24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -334,7 +334,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
     try {
       List<FiCaSchedulerApp> apps = new ArrayList<>();
       for (CSQueue childQueue : getChildQueues()) {
-        apps.addAll(((LeafQueue) childQueue).getApplications());
+        apps.addAll(((AbstractLeafQueue) childQueue).getApplications());
       }
       return Collections.unmodifiableList(apps);
     } finally {
@@ -347,7 +347,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
     try {
       List<FiCaSchedulerApp> apps = new ArrayList<>();
       for (CSQueue childQueue : getChildQueues()) {
-        apps.addAll(((LeafQueue) childQueue).getPendingApplications());
+        apps.addAll(((AbstractLeafQueue) childQueue).getPendingApplications());
       }
       return Collections.unmodifiableList(apps);
     } finally {
@@ -360,7 +360,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
     try {
       List<FiCaSchedulerApp> apps = new ArrayList<>();
       for (CSQueue childQueue : getChildQueues()) {
-        apps.addAll(((LeafQueue) childQueue).getAllApplications());
+        apps.addAll(((AbstractLeafQueue) childQueue).getAllApplications());
       }
       return Collections.unmodifiableList(apps);
     } finally {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index aec2bd8..4339189 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -675,10 +675,10 @@ public class ParentQueue extends AbstractCSQueue {
           // parent Queue has been converted to child queue. The CS has already
           // checked to ensure that this child-queue is in STOPPED state if
           // Child queue has been converted to ParentQueue.
-          if ((childQueue instanceof LeafQueue
+          if ((childQueue instanceof AbstractLeafQueue
               && newChildQueue instanceof ParentQueue)
               || (childQueue instanceof ParentQueue
-                  && newChildQueue instanceof LeafQueue)) {
+                  && newChildQueue instanceof AbstractLeafQueue)) {
             // We would convert this LeafQueue to ParentQueue, or vice versa.
             // consider this as the combination of DELETE then ADD.
             newChildQueue.setParent(this);
@@ -1134,7 +1134,7 @@ public class ParentQueue extends AbstractCSQueue {
           assignment = childAssignment;
         }
         Resource blockedHeadroom = null;
-        if (childQueue instanceof LeafQueue) {
+        if (childQueue instanceof AbstractLeafQueue) {
           blockedHeadroom = childLimits.getHeadroom();
         } else {
           blockedHeadroom = childLimits.getBlockedHeadroom();
@@ -1548,7 +1548,7 @@ public class ParentQueue extends AbstractCSQueue {
       FiCaSchedulerNode node = csContext.getNode(
           toKillContainer.getAllocatedNode());
       if (null != attempt && null != node) {
-        LeafQueue lq = attempt.getCSLeafQueue();
+        AbstractLeafQueue lq = attempt.getCSLeafQueue();
         lq.completedContainer(clusterResource, attempt, node, toKillContainer,
             SchedulerUtils.createPreemptedContainerStatus(
                 toKillContainer.getContainerId(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 5a8ce9a..4208bf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -40,6 +40,11 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
   public ReservationQueue(CapacitySchedulerContext cs, String queueName,
       PlanQueue parent) throws IOException {
     super(cs, queueName, parent, null);
+    super.setupQueueConfigs(cs.getClusterResource(),
+        cs.getConfiguration());
+
+    LOG.debug("Initialized ReservationQueue: name={}, fullname={}",
+        queueName, getQueuePath());
     // the following parameters are common to all reservation in the plan
     updateQuotas(parent.getUserLimitForReservation(),
         parent.getUserLimitFactor(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 94df9ab..73aad3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -55,7 +55,7 @@ public class UsersManager implements AbstractUsersManager {
   /*
    * Member declaration for UsersManager class.
    */
-  private final LeafQueue lQueue;
+  private final AbstractLeafQueue lQueue;
   private final RMNodeLabelsManager labelManager;
   private final ResourceCalculator resourceCalculator;
   private final CapacitySchedulerContext scheduler;
@@ -301,7 +301,7 @@ public class UsersManager implements AbstractUsersManager {
    * @param resourceCalculator
    *          rc
    */
-  public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
+  public UsersManager(QueueMetrics metrics, AbstractLeafQueue lQueue,
       RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
       ResourceCalculator resourceCalculator) {
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
index 76ab7ca..7458df9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
@@ -20,35 +20,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
 
 import org.apache.hadoop.classification.VisibleForTesting;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
-    .SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .AbstractAutoCreatedLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .AutoCreatedLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .AutoCreatedLeafQueueConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .AutoCreatedQueueManagementPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractAutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueueConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueManagementPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .CapacitySchedulerContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .ManagedParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
-    .QueueManagementChange;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
-    .FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueManagementChange;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.Clock;
 import org.apache.hadoop.yarn.util.MonotonicClock;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -136,7 +124,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
       return false;
     }
 
-    public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
+    public boolean createLeafQueueStateIfNotExists(AbstractLeafQueue leafQueue,
         String partition) {
       return addLeafQueueStateIfNotExists(leafQueue.getQueuePath(), partition,
           new LeafQueueStatePerPartition());
@@ -482,9 +470,9 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
       Set<String> newQueues = new HashSet<>();
 
       for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
-        if (newQueue instanceof LeafQueue) {
+        if (newQueue instanceof AbstractLeafQueue) {
           for (String nodeLabel : leafQueueTemplateNodeLabels) {
-            leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue,
+            leafQueueState.createLeafQueueStateIfNotExists((AbstractLeafQueue) newQueue,
                 nodeLabel);
             newPartitions.add(nodeLabel);
           }
@@ -590,7 +578,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
       if (leafQueue != null) {
         if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
           QueueCapacities capacities = leafQueueEntitlements.getCapacityOfQueue(leafQueue);
-          updateToZeroCapacity(capacities, nodeLabel, (LeafQueue)childQueue);
+          updateToZeroCapacity(capacities, nodeLabel, (AbstractLeafQueue) childQueue);
           deactivatedQueues.put(leafQueue.getQueuePath(), leafQueueTemplateCapacities);
         }
       } else {
@@ -780,7 +768,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   private void updateToZeroCapacity(QueueCapacities capacities,
-      String nodeLabel, LeafQueue leafQueue) {
+      String nodeLabel, AbstractLeafQueue leafQueue) {
     capacities.setCapacity(nodeLabel, 0.0f);
     capacities.setMaximumCapacity(nodeLabel,
         leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
@@ -801,7 +789,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
   }
 
   @VisibleForTesting
-  LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
+  LeafQueueStatePerPartition getLeafQueueState(AbstractLeafQueue queue,
       String partition) throws SchedulerDynamicEditException {
     readLock.lock();
     try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 011a254..3a0fd34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -66,11 +66,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
@@ -958,8 +958,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     }
   }
 
-  public LeafQueue getCSLeafQueue() {
-    return (LeafQueue)queue;
+  public AbstractLeafQueue getCSLeafQueue() {
+    return (AbstractLeafQueue)queue;
   }
 
   public CSAssignment assignContainers(Resource clusterResource,
@@ -996,7 +996,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   protected void getPendingAppDiagnosticMessage(
       StringBuilder diagnosticMessage) {
-    LeafQueue queue = getCSLeafQueue();
+    AbstractLeafQueue queue = getCSLeafQueue();
     diagnosticMessage.append(" Details : AM Partition = ")
         .append(appAMNodePartitionName.isEmpty()
         ? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName)
@@ -1019,7 +1019,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
 
   protected void getActivedAppDiagnosticMessage(
       StringBuilder diagnosticMessage) {
-    LeafQueue queue = getCSLeafQueue();
+    AbstractLeafQueue queue = getCSLeafQueue();
     QueueCapacities queueCapacities = queue.getQueueCapacities();
     QueueResourceQuotas queueResourceQuotas = queue.getQueueResourceQuotas();
     diagnosticMessage.append(" Details : AM Partition = ")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
index 74c7c20..3d410ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
@@ -25,10 +25,10 @@ import javax.xml.bind.annotation.XmlTransient;
 import javax.xml.bind.annotation.XmlType;
 
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper;
 
@@ -179,7 +179,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
     List<CSQueue> childLeafQueues = new ArrayList<>();
     List<CSQueue> childNonLeafQueues = new ArrayList<>();
     for (CSQueue queue : parent.getChildQueues()) {
-      if (queue instanceof LeafQueue) {
+      if (queue instanceof AbstractLeafQueue) {
         childLeafQueues.add(queue);
       } else {
         childNonLeafQueues.add(queue);
@@ -190,8 +190,8 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
 
     for (CSQueue queue : childQueues) {
       CapacitySchedulerQueueInfo info;
-      if (queue instanceof LeafQueue) {
-        info = new CapacitySchedulerLeafQueueInfo(cs, (LeafQueue) queue);
+      if (queue instanceof AbstractLeafQueue) {
+        info = new CapacitySchedulerLeafQueueInfo(cs, (AbstractLeafQueue) queue);
       } else {
         info = new CapacitySchedulerQueueInfo(cs, queue);
         info.queues = getQueues(cs, queue);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index 4e9ced8..5b1da19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -27,10 +27,10 @@ import javax.xml.bind.annotation.XmlTransient;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
     .AutoCreatedLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
 
@@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
   CapacitySchedulerLeafQueueInfo() {
   };
 
-  CapacitySchedulerLeafQueueInfo(CapacityScheduler cs, LeafQueue q) {
+  CapacitySchedulerLeafQueueInfo(CapacityScheduler cs, AbstractLeafQueue q) {
     super(cs, q);
     numActiveApplications = q.getNumActiveApplications();
     numPendingApplications = q.getNumPendingApplications();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
index 8b3602d..0ba9bbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
@@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper;
 
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AutoQueueTemplatePropertiesInfo;
@@ -82,7 +82,7 @@ public class CapacitySchedulerInfoHelper {
   }
 
   public static String getQueueType(CSQueue queue) {
-    if (queue instanceof LeafQueue) {
+    if (queue instanceof AbstractLeafQueue) {
       return LEAF_QUEUE;
     } else if (queue instanceof ParentQueue) {
       return PARENT_QUEUE;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index c5f45fd..52a34fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -445,7 +446,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
     checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource,
         numContainers);
 
-    LeafQueue queue = (LeafQueue) app.getQueue();
+    AbstractLeafQueue queue = (AbstractLeafQueue) app.getQueue();
     Resource availableResources =
         Resources.subtract(queueResource, usedResource);
     // ************ check app headroom ****************
@@ -470,7 +471,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
       SchedulerApplication<SchedulerApplicationAttempt> app,
       Resource clusterResource, Resource queueResource, Resource usedResource,
       int numContainers) {
-    LeafQueue leafQueue = (LeafQueue) app.getQueue();
+    AbstractLeafQueue leafQueue = (AbstractLeafQueue) app.getQueue();
     // assert queue used resources.
     assertEquals(usedResource, leafQueue.getUsedResources());
     assertEquals(numContainers, leafQueue.getNumContainers());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
index 43347c7..b560d97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
@@ -139,7 +139,7 @@ public class TestCSMaxRunningAppsEnforcer {
   }
 
   private void removeApp(FiCaSchedulerApp attempt) {
-    LeafQueue queue = attempt.getCSLeafQueue();
+    AbstractLeafQueue queue = attempt.getCSLeafQueue();
     queue.finishApplicationAttempt(attempt, queue.getQueuePath());
     maxAppsEnforcer.untrackApp(attempt);
     maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
index 3d40ccf..b5eaf3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
@@ -685,7 +685,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
         "root.a.*") + "capacity", "6w");
     cs.reinitialize(csConf, mockRM.getRMContext());
 
-    LeafQueue a2 = createQueue("root.a.a-auto.a2");
+    AbstractLeafQueue a2 = createQueue("root.a.a-auto.a2");
     Assert.assertEquals("weight is not set by template", 6f,
         a2.getQueueCapacities().getWeight(), 1e-6);
     Assert.assertEquals("user limit factor should be disabled with dynamic queues",
@@ -719,7 +719,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
         "root.a") + CapacitySchedulerConfiguration
         .AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, false);
     cs.reinitialize(csConf, mockRM.getRMContext());
-    LeafQueue a3 = createQueue("root.a.a3");
+    AbstractLeafQueue a3 = createQueue("root.a.a3");
     Assert.assertFalse("auto queue deletion should be turned off on a3",
         a3.isEligibleForAutoDeletion());
 
@@ -729,27 +729,26 @@ public class TestCapacitySchedulerNewQueueAutoCreation
     csConf.setQueues("root", new String[]{"a", "b", "c"});
     csConf.setAutoQueueCreationV2Enabled("root.c", true);
     cs.reinitialize(csConf, mockRM.getRMContext());
-    LeafQueue c1 = createQueue("root.c.c1");
+    AbstractLeafQueue c1 = createQueue("root.c.c1");
     Assert.assertEquals("weight is not set for label TEST", 6f,
         c1.getQueueCapacities().getWeight("TEST"), 1e-6);
     cs.reinitialize(csConf, mockRM.getRMContext());
-    c1 = (LeafQueue) cs.getQueue("root.c.c1");
+    c1 = (AbstractLeafQueue) cs.getQueue("root.c.c1");
     Assert.assertEquals("weight is not set for label TEST", 6f,
         c1.getQueueCapacities().getWeight("TEST"), 1e-6);
-
   }
 
   @Test
   public void testAutoCreatedQueueConfigChange() throws Exception {
     startScheduler();
-    LeafQueue a2 = createQueue("root.a.a-auto.a2");
+    AbstractLeafQueue a2 = createQueue("root.a.a-auto.a2");
     csConf.setNonLabeledQueueWeight("root.a.a-auto.a2", 4f);
     cs.reinitialize(csConf, mockRM.getRMContext());
 
     Assert.assertEquals("weight is not explicitly set", 4f,
         a2.getQueueCapacities().getWeight(), 1e-6);
 
-    a2 = (LeafQueue) cs.getQueue("root.a.a-auto.a2");
+    a2 = (AbstractLeafQueue) cs.getQueue("root.a.a-auto.a2");
     csConf.setState("root.a.a-auto.a2", QueueState.STOPPED);
     cs.reinitialize(csConf, mockRM.getRMContext());
     Assert.assertEquals("root.a.a-auto.a2 has not been stopped",
@@ -1223,7 +1222,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
     Assert.assertNull("root.e.e1-auto should have been removed", eAuto);
   }
 
-  protected LeafQueue createQueue(String queuePath) throws YarnException,
+  protected AbstractLeafQueue createQueue(String queuePath) throws YarnException,
       IOException {
     return autoQueueHandler.createQueue(new QueuePath(queuePath));
   }

---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org