You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/12/08 23:23:41 UTC

[4/9] hadoop git commit: YARN-7473. Implement Framework and policy for capacity management of auto created queues. (Suma Shivaprasad via wangda)

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
index ff795e4..cbdb21d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -17,13 +17,23 @@
  */
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
+import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
+    .FiCaSchedulerApp;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
 
 /**
  * Auto Creation enabled Parent queue. This queue initially does not have any
@@ -44,54 +54,125 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
       final String queueName, final CSQueue parent, final CSQueue old)
       throws IOException {
     super(cs, queueName, parent, old);
-    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
-        csContext.getConfiguration());
-    this.leafQueueTemplate = initializeLeafQueueConfigs(
-        leafQueueTemplateConfPrefix).build();
+
+    shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
+        csContext.getConfiguration()
+            .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+                getQueuePath());
+
+    leafQueueTemplate = initializeLeafQueueConfigs().build();
 
     StringBuffer queueInfo = new StringBuffer();
     queueInfo.append("Created Managed Parent Queue: ").append(queueName).append(
         "]\nwith capacity: [").append(super.getCapacity()).append(
         "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
-        "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
-        "]\nwith max apps per user: [").append(
-        leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
-        .append(leafQueueTemplate.getUserLimit()).append(
-        "]\nwith user limit factor: [").append(
-        leafQueueTemplate.getUserLimitFactor()).append("].");
+        "].");
     LOG.info(queueInfo.toString());
+
+    initializeQueueManagementPolicy();
   }
 
   @Override
   public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
       throws IOException {
-    validate(newlyParsedQueue);
-    super.reinitialize(newlyParsedQueue, clusterResource);
-    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
-        csContext.getConfiguration());
-    this.leafQueueTemplate = initializeLeafQueueConfigs(
-        leafQueueTemplateConfPrefix).build();
+
+    try {
+      writeLock.lock();
+      validate(newlyParsedQueue);
+
+      shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
+          csContext.getConfiguration()
+              .getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
+                  getQueuePath());
+
+      //validate if capacity is exceeded for child queues
+      if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) {
+        float childCap = sumOfChildCapacities();
+        if (getCapacity() < childCap) {
+          throw new IOException(
+              "Total of Auto Created leaf queues guaranteed capacity : "
+                  + childCap + " exceeds Parent queue's " + getQueuePath()
+                  + " guaranteed capacity " + getCapacity() + ""
+                  + ".Cannot enforce policy to auto"
+                  + " create queues beyond parent queue's capacity");
+        }
+      }
+
+      leafQueueTemplate = initializeLeafQueueConfigs().build();
+
+      super.reinitialize(newlyParsedQueue, clusterResource);
+
+      // run reinitialize on each existing queue, to trigger absolute cap
+      // recomputations
+      for (CSQueue res : this.getChildQueues()) {
+        res.reinitialize(res, clusterResource);
+      }
+
+      //clear state in policy
+      reinitializeQueueManagementPolicy();
+
+      //reassign capacities according to policy
+      final List<QueueManagementChange> queueManagementChanges =
+          queueManagementPolicy.computeQueueManagementChanges();
+
+      validateAndApplyQueueManagementChanges(queueManagementChanges);
+
+      StringBuffer queueInfo = new StringBuffer();
+      queueInfo.append("Reinitialized Managed Parent Queue: ").append(queueName)
+          .append("]\nwith capacity: [").append(super.getCapacity()).append(
+          "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
+          "].");
+      LOG.info(queueInfo.toString());
+    } catch (YarnException ye) {
+      LOG.error("Exception while computing policy changes for leaf queue : "
+          + getQueueName(), ye);
+      throw new IOException(ye);
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  @Override
-  protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs(
-      String queuePath) {
+  private void initializeQueueManagementPolicy() {
+    queueManagementPolicy =
+        csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
+            getQueuePath());
+
+    queueManagementPolicy.init(csContext, this);
+  }
+
+  private void reinitializeQueueManagementPolicy() {
+    AutoCreatedQueueManagementPolicy managementPolicy =
+        csContext.getConfiguration().getAutoCreatedQueueManagementPolicyClass(
+            getQueuePath());
+
+    if (!(managementPolicy.getClass().equals(
+        this.queueManagementPolicy.getClass()))) {
+      queueManagementPolicy = managementPolicy;
+      queueManagementPolicy.init(csContext, this);
+    } else{
+      queueManagementPolicy.reinitialize(csContext, this);
+    }
+  }
+
+  protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() {
 
-    AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate =
-        super.initializeLeafQueueConfigs(queuePath);
+    AutoCreatedLeafQueueConfig.Builder builder =
+        new AutoCreatedLeafQueueConfig.Builder();
 
-    CapacitySchedulerConfiguration conf = csContext.getConfiguration();
-    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf);
+    String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
+        csContext.getConfiguration());
+    //Load template configuration
+    builder.configuration(
+        super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix));
+
+    //Load template capacities
     QueueCapacities queueCapacities = new QueueCapacities(false);
-    CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix,
+    CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration()
+            .getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
         csContext.getConfiguration(), queueCapacities, getQueueCapacities());
-    leafQueueTemplate.capacities(queueCapacities);
-
-    shouldFailAutoCreationWhenGuaranteedCapacityExceeded =
-        conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(
-            getQueuePath());
+    builder.capacities(queueCapacities);
 
-    return leafQueueTemplate;
+    return builder;
   }
 
   protected void validate(final CSQueue newlyParsedQueue) throws IOException {
@@ -106,7 +187,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
 
   @Override
   public void addChildQueue(CSQueue childQueue)
-      throws SchedulerDynamicEditException {
+      throws SchedulerDynamicEditException, IOException {
     try {
       writeLock.lock();
 
@@ -138,21 +219,164 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
           throw new SchedulerDynamicEditException(
               "Cannot auto create leaf queue " + leafQueueName + ". Child "
                   + "queues capacities have reached parent queue : "
-                  + parentQueue.getQueuePath() + " guaranteed capacity");
+                  + parentQueue.getQueuePath() + "'s guaranteed capacity");
         }
       }
 
       AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
       super.addChildQueue(leafQueue);
-      //TODO - refresh policy queue after capacity management is added
+      final AutoCreatedLeafQueueConfig initialLeafQueueTemplate =
+          queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue);
 
+      leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate);
     } finally {
       writeLock.unlock();
     }
   }
 
-  private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
-    return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
+  public List<FiCaSchedulerApp> getScheduleableApplications() {
+    try {
+      readLock.lock();
+      List<FiCaSchedulerApp> apps = new ArrayList<>();
+      for (CSQueue childQueue : getChildQueues()) {
+        apps.addAll(((LeafQueue) childQueue).getApplications());
+      }
+      return Collections.unmodifiableList(apps);
+    } finally {
+      readLock.unlock();
+    }
   }
 
+  public List<FiCaSchedulerApp> getPendingApplications() {
+    try {
+      readLock.lock();
+      List<FiCaSchedulerApp> apps = new ArrayList<>();
+      for (CSQueue childQueue : getChildQueues()) {
+        apps.addAll(((LeafQueue) childQueue).getPendingApplications());
+      }
+      return Collections.unmodifiableList(apps);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public List<FiCaSchedulerApp> getAllApplications() {
+    try {
+      readLock.lock();
+      List<FiCaSchedulerApp> apps = new ArrayList<>();
+      for (CSQueue childQueue : getChildQueues()) {
+        apps.addAll(((LeafQueue) childQueue).getAllApplications());
+      }
+      return Collections.unmodifiableList(apps);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  public String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) {
+    return CapacitySchedulerConfiguration.PREFIX + conf
+        .getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
+  }
+
+  public boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded() {
+    return shouldFailAutoCreationWhenGuaranteedCapacityExceeded;
+  }
+
+  /**
+   * Asynchronously called from scheduler to apply queue management changes
+   *
+   * @param queueManagementChanges
+   */
+  public void validateAndApplyQueueManagementChanges(
+      List<QueueManagementChange> queueManagementChanges)
+      throws IOException, SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+
+      validateQueueManagementChanges(queueManagementChanges);
+
+      applyQueueManagementChanges(queueManagementChanges);
+
+      AutoCreatedQueueManagementPolicy policy =
+          getAutoCreatedQueueManagementPolicy();
+
+      //acquires write lock on policy
+      policy.commitQueueManagementChanges(queueManagementChanges);
+
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public void validateQueueManagementChanges(
+      List<QueueManagementChange> queueManagementChanges)
+      throws SchedulerDynamicEditException {
+
+    for (QueueManagementChange queueManagementChange : queueManagementChanges) {
+
+      CSQueue childQueue = queueManagementChange.getQueue();
+
+      if (!(childQueue instanceof AutoCreatedLeafQueue)) {
+        throw new SchedulerDynamicEditException(
+            "queue should be " + "AutoCreatedLeafQueue. Found " + childQueue
+                .getClass());
+      }
+
+      if (!(AbstractManagedParentQueue.class.
+          isAssignableFrom(childQueue.getParent().getClass()))) {
+        LOG.error("Queue " + getQueueName()
+            + " is not an instance of PlanQueue or ManagedParentQueue." + " "
+            + "Ignoring update " + queueManagementChanges);
+        throw new SchedulerDynamicEditException(
+            "Queue " + getQueueName() + " is not a AutoEnabledParentQueue."
+                + " Ignoring update " + queueManagementChanges);
+      }
+
+      switch (queueManagementChange.getQueueAction()){
+      case UPDATE_QUEUE:
+        AutoCreatedLeafQueueConfig template =
+            queueManagementChange.getUpdatedQueueTemplate();
+        ((AutoCreatedLeafQueue) childQueue).validateConfigurations(template);
+        break;
+      }
+    }
+  }
+
+  private void applyQueueManagementChanges(
+      List<QueueManagementChange> queueManagementChanges)
+      throws SchedulerDynamicEditException, IOException {
+    for (QueueManagementChange queueManagementChange : queueManagementChanges) {
+      switch (queueManagementChange.getQueueAction()){
+      case UPDATE_QUEUE:
+        AutoCreatedLeafQueue childQueueToBeUpdated =
+            (AutoCreatedLeafQueue) queueManagementChange.getQueue();
+        //acquires write lock on leaf queue
+        childQueueToBeUpdated.reinitializeFromTemplate(
+            queueManagementChange.getUpdatedQueueTemplate());
+        break;
+      }
+    }
+  }
+
+  public CapacitySchedulerConfiguration getLeafQueueConfigs(
+      String leafQueueName) {
+    return getLeafQueueConfigs(getLeafQueueTemplate().getLeafQueueConfigs(),
+        leafQueueName);
+  }
+
+  public CapacitySchedulerConfiguration getLeafQueueConfigs(
+      CapacitySchedulerConfiguration templateConfig, String leafQueueName) {
+    CapacitySchedulerConfiguration leafQueueConfigTemplate = new
+        CapacitySchedulerConfiguration(new Configuration(false), false);
+    for (final Iterator<Map.Entry<String, String>> iterator =
+         templateConfig.iterator(); iterator.hasNext(); ) {
+      Map.Entry<String, String> confKeyValuePair = iterator.next();
+      final String name = confKeyValuePair.getKey().replaceFirst(
+          CapacitySchedulerConfiguration
+              .AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX,
+          leafQueueName);
+      leafQueueConfigTemplate.set(name, confKeyValuePair.getValue());
+    }
+    return leafQueueConfigTemplate;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
index b7f8aa6..757002f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -35,61 +36,132 @@ public class PlanQueue extends AbstractManagedParentQueue {
 
   private static final Logger LOG = LoggerFactory.getLogger(PlanQueue.class);
 
+  private int maxAppsForReservation;
+  private int maxAppsPerUserForReservation;
+  private int userLimit;
+  private float userLimitFactor;
+  protected CapacitySchedulerContext schedulerContext;
   private boolean showReservationsAsQueues;
 
   public PlanQueue(CapacitySchedulerContext cs, String queueName,
       CSQueue parent, CSQueue old) throws IOException {
     super(cs, queueName, parent, old);
-    this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
+
+    this.schedulerContext = cs;
+    // Set the reservation queue attributes for the Plan
+    CapacitySchedulerConfiguration conf = cs.getConfiguration();
+    String queuePath = super.getQueuePath();
+    int maxAppsForReservation = conf.getMaximumApplicationsPerQueue(queuePath);
+    showReservationsAsQueues = conf.getShowReservationAsQueues(queuePath);
+    if (maxAppsForReservation < 0) {
+      maxAppsForReservation =
+          (int) (CapacitySchedulerConfiguration.
+              DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
+              .getAbsoluteCapacity());
+    }
+    int userLimit = conf.getUserLimit(queuePath);
+    float userLimitFactor = conf.getUserLimitFactor(queuePath);
+    int maxAppsPerUserForReservation =
+        (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
+    updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
+        maxAppsPerUserForReservation);
 
     StringBuffer queueInfo = new StringBuffer();
-    queueInfo.append("Created Plan Queue: ").append(queueName).append(
-        "]\nwith capacity: [").append(super.getCapacity()).append(
-        "]\nwith max capacity: [").append(super.getMaximumCapacity()).append(
-        "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append(
-        "]\nwith max apps per user: [").append(
-        leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [")
-        .append(leafQueueTemplate.getUserLimit()).append(
-        "]\nwith user limit factor: [").append(
-        leafQueueTemplate.getUserLimitFactor()).append("].");
+    queueInfo.append("Created Plan Queue: ").append(queueName)
+        .append("\nwith capacity: [").append(super.getCapacity())
+        .append("]\nwith max capacity: [").append(super.getMaximumCapacity())
+        .append("\nwith max reservation apps: [").append(maxAppsForReservation)
+        .append("]\nwith max reservation apps per user: [")
+        .append(maxAppsPerUserForReservation).append("]\nwith user limit: [")
+        .append(userLimit).append("]\nwith user limit factor: [")
+        .append(userLimitFactor).append("].");
     LOG.info(queueInfo.toString());
   }
 
   @Override
-  public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
-      throws IOException {
-    validate(newlyParsedQueue);
-    super.reinitialize(newlyParsedQueue, clusterResource);
-    this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build();
+  public void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
+
+      PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+
+      if (newlyParsedParentQueue.getChildQueues().size() != 1) {
+        throw new IOException(
+            "Reservable Queue should not have sub-queues in the"
+                + "configuration expect the default reservation queue");
+      }
+
+      // Set new configs
+      setupQueueConfigs(clusterResource);
+
+      updateQuotas(newlyParsedParentQueue.userLimit,
+          newlyParsedParentQueue.userLimitFactor,
+          newlyParsedParentQueue.maxAppsForReservation,
+          newlyParsedParentQueue.maxAppsPerUserForReservation);
+
+      // run reinitialize on each existing queue, to trigger absolute cap
+      // recomputations
+      for (CSQueue res : this.getChildQueues()) {
+        res.reinitialize(res, clusterResource);
+      }
+      showReservationsAsQueues =
+          newlyParsedParentQueue.showReservationsAsQueues;
+    } finally {
+      writeLock.unlock();
+    }
   }
 
-  @Override
-  protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
-      (String queuePath) {
-    AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super
-        .initializeLeafQueueConfigs
-        (queuePath);
-    showReservationsAsQueues = csContext.getConfiguration()
-        .getShowReservationAsQueues(queuePath);
-    return leafQueueTemplate;
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    this.userLimit = userLimit;
+    this.userLimitFactor = userLimitFactor;
+    this.maxAppsForReservation = maxAppsForReservation;
+    this.maxAppsPerUserForReservation = maxAppsPerUserForReservation;
   }
 
-  protected void validate(final CSQueue newlyParsedQueue) throws IOException {
-    // Sanity check
-    if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue
-        .getQueuePath().equals(getQueuePath())) {
-      throw new IOException(
-          "Trying to reinitialize " + getQueuePath() + " from "
-              + newlyParsedQueue.getQueuePath());
-    }
+  /**
+   * Number of maximum applications for each of the reservations in this Plan.
+   *
+   * @return maxAppsForreservation
+   */
+  public int getMaxApplicationsForReservations() {
+    return maxAppsForReservation;
+  }
 
-    PlanQueue newlyParsedParentQueue = (PlanQueue) newlyParsedQueue;
+  /**
+   * Number of maximum applications per user for each of the reservations in
+   * this Plan.
+   *
+   * @return maxAppsPerUserForreservation
+   */
+  public int getMaxApplicationsPerUserForReservation() {
+    return maxAppsPerUserForReservation;
+  }
 
-    if (newlyParsedParentQueue.getChildQueues().size() != 1) {
-      throw new IOException(
-          "Reservable Queue should not have sub-queues in the"
-              + "configuration expect the default reservation queue");
-    }
+  /**
+   * User limit value for each of the reservations in this Plan.
+   *
+   * @return userLimit
+   */
+  public int getUserLimitForReservation() {
+    return userLimit;
+  }
+
+  /**
+   * User limit factor value for each of the reservations in this Plan.
+   *
+   * @return userLimitFactor
+   */
+  public float getUserLimitFactor() {
+    return userLimitFactor;
   }
 
   /**
@@ -98,4 +170,4 @@ public class PlanQueue extends AbstractManagedParentQueue {
   public boolean showReservationsAsQueues() {
     return showReservationsAsQueues;
   }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java
new file mode 100644
index 0000000..74d9b23
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementChange.java
@@ -0,0 +1,148 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.QueueState;
+
+/**
+ * Encapsulates Queue entitlement and state updates needed
+ * for adjusting capacity dynamically
+ *
+ */
+@Private
+@Unstable
+public abstract class QueueManagementChange {
+
+  private final CSQueue queue;
+
+  /**
+   * Updating the queue may involve entitlement updates
+   * and/or QueueState changes
+   *
+   * QueueAction can potentially be enhanced
+   * for adding, removing queues for queue management
+   */
+  public enum QueueAction {
+    UPDATE_QUEUE
+  }
+
+  private AutoCreatedLeafQueueConfig
+      queueTemplateUpdate;
+
+  private final QueueAction queueAction;
+  /**
+   * Updated Queue state with the new entitlement
+   */
+  private QueueState transitionToQueueState;
+
+  public QueueManagementChange(final CSQueue queue,
+      final QueueAction queueAction) {
+    this.queue = queue;
+    this.queueAction = queueAction;
+  }
+
+  public QueueManagementChange(final CSQueue queue,
+      final QueueAction queueAction, QueueState targetQueueState,
+      final AutoCreatedLeafQueueConfig
+          queueTemplateUpdates) {
+    this(queue, queueAction, queueTemplateUpdates);
+    this.transitionToQueueState = targetQueueState;
+  }
+
+  public QueueManagementChange(final CSQueue queue,
+      final QueueAction queueAction,
+      final AutoCreatedLeafQueueConfig
+      queueTemplateUpdates) {
+    this(queue, queueAction);
+    this.queueTemplateUpdate = queueTemplateUpdates;
+  }
+
+  public QueueState getTransitionToQueueState() {
+    return transitionToQueueState;
+  }
+
+  public CSQueue getQueue() {
+    return queue;
+  }
+
+  public AutoCreatedLeafQueueConfig getUpdatedQueueTemplate() {
+    return queueTemplateUpdate;
+  }
+
+  public QueueAction getQueueAction() {
+    return queueAction;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (!(o instanceof QueueManagementChange))
+      return false;
+
+    QueueManagementChange that = (QueueManagementChange) o;
+
+    if (queue != null ? !queue.equals(that.queue) : that.queue != null)
+      return false;
+    if (queueTemplateUpdate != null ? !queueTemplateUpdate.equals(
+        that.queueTemplateUpdate) : that.queueTemplateUpdate != null)
+      return false;
+    if (queueAction != that.queueAction)
+      return false;
+    return transitionToQueueState == that.transitionToQueueState;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = queue != null ? queue.hashCode() : 0;
+    result = 31 * result + (queueTemplateUpdate != null ?
+        queueTemplateUpdate.hashCode() :
+        0);
+    result = 31 * result + (queueAction != null ? queueAction.hashCode() : 0);
+    result = 31 * result + (transitionToQueueState != null ?
+        transitionToQueueState.hashCode() :
+        0);
+    return result;
+  }
+
+  @Override
+  public String toString() {
+    return "QueueManagementChange{" + "queue=" + queue
+        + ", updatedEntitlementsByPartition=" + queueTemplateUpdate
+        + ", queueAction=" + queueAction + ", transitionToQueueState="
+        + transitionToQueueState + '}';
+  }
+
+  public static class UpdateQueue extends QueueManagementChange {
+
+    public UpdateQueue(final CSQueue queue, QueueState targetQueueState,
+        final AutoCreatedLeafQueueConfig
+            queueTemplateUpdate) {
+      super(queue, QueueAction.UPDATE_QUEUE, targetQueueState,
+          queueTemplateUpdate);
+    }
+
+    public UpdateQueue(final CSQueue queue,
+        final AutoCreatedLeafQueueConfig
+            queueTemplateUpdate) {
+      super(queue, QueueAction.UPDATE_QUEUE, queueTemplateUpdate);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java
new file mode 100644
index 0000000..9b0cf7b
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueManagementDynamicEditPolicy.java
@@ -0,0 +1,272 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
+import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingEditPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
+
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+    .QueueManagementChangeEvent;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Queue Management scheduling policy for managed parent queues which enable
+ * auto child queue creation
+ */
+public class QueueManagementDynamicEditPolicy implements SchedulingEditPolicy {
+
+  private static final Log LOG =
+      LogFactory.getLog(QueueManagementDynamicEditPolicy.class);
+
+  private Clock clock;
+
+  // Pointer to other RM components
+  private RMContext rmContext;
+  private ResourceCalculator rc;
+  private CapacityScheduler scheduler;
+  private RMNodeLabelsManager nlm;
+
+  private long monitoringInterval;
+
+  private Set<String> managedParentQueues = new HashSet<>();
+
+  /**
+   * Instantiated by CapacitySchedulerConfiguration
+   */
+  public QueueManagementDynamicEditPolicy() {
+    clock = SystemClock.getInstance();
+  }
+
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  public QueueManagementDynamicEditPolicy(RMContext context,
+      CapacityScheduler scheduler) {
+    init(context.getYarnConfiguration(), context, scheduler);
+  }
+
+  @SuppressWarnings("unchecked")
+  @VisibleForTesting
+  public QueueManagementDynamicEditPolicy(RMContext context,
+      CapacityScheduler scheduler, Clock clock) {
+    init(context.getYarnConfiguration(), context, scheduler);
+    this.clock = clock;
+  }
+
+  @Override
+  public void init(final Configuration config, final RMContext context,
+      final ResourceScheduler sched) {
+    LOG.info("Queue Management Policy monitor:" + this.
+        getClass().getCanonicalName());
+    assert null == scheduler : "Unexpected duplicate call to init";
+    if (!(sched instanceof CapacityScheduler)) {
+      throw new YarnRuntimeException("Class " +
+          sched.getClass().getCanonicalName() + " not instance of " +
+          CapacityScheduler.class.getCanonicalName());
+    }
+    rmContext = context;
+    scheduler = (CapacityScheduler) sched;
+    clock = scheduler.getClock();
+
+    rc = scheduler.getResourceCalculator();
+    nlm = scheduler.getRMContext().getNodeLabelManager();
+
+    CapacitySchedulerConfiguration csConfig = scheduler.getConfiguration();
+
+    monitoringInterval = csConfig.getLong(
+        CapacitySchedulerConfiguration.QUEUE_MANAGEMENT_MONITORING_INTERVAL,
+        CapacitySchedulerConfiguration.
+            DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL);
+
+    initQueues();
+  }
+
+  /**
+   * Reinitializes queues(Called on scheduler.reinitialize)
+   * @param config Configuration
+   * @param context The resourceManager's context
+   * @param sched The scheduler
+   */
+  public void reinitialize(final Configuration config, final RMContext context,
+      final ResourceScheduler sched) {
+    //TODO - Wire with scheduler reinitialize and remove initQueues below?
+    initQueues();
+  }
+
+  private void initQueues() {
+    managedParentQueues.clear();
+    for (Map.Entry<String, CSQueue> queues : scheduler
+        .getCapacitySchedulerQueueManager()
+        .getQueues().entrySet()) {
+
+      String queueName = queues.getKey();
+      CSQueue queue = queues.getValue();
+
+      if ( queue instanceof ManagedParentQueue) {
+        managedParentQueues.add(queueName);
+      }
+    }
+  }
+
+  @Override
+  public void editSchedule() {
+    long startTs = clock.getTime();
+
+    initQueues();
+    manageAutoCreatedLeafQueues();
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Total time used=" + (clock.getTime() - startTs) + " ms.");
+    }
+  }
+
+  @VisibleForTesting
+  List<QueueManagementChange> manageAutoCreatedLeafQueues()
+  {
+
+    List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
+    // All partitions to look at
+
+    //Proceed only if there are queues to process
+    if (managedParentQueues.size() > 0) {
+      for (String parentQueueName : managedParentQueues) {
+        ManagedParentQueue parentQueue =
+            (ManagedParentQueue) scheduler.getCapacitySchedulerQueueManager().
+                getQueue(parentQueueName);
+
+        queueManagementChanges.addAll(
+            computeQueueManagementChanges
+            (parentQueue));
+      }
+    }
+    return queueManagementChanges;
+  }
+
+
+  @VisibleForTesting
+  List<QueueManagementChange> computeQueueManagementChanges
+      (ManagedParentQueue parentQueue) {
+
+    List<QueueManagementChange> queueManagementChanges =
+        Collections.emptyList();
+    if (!parentQueue.shouldFailAutoCreationWhenGuaranteedCapacityExceeded()) {
+
+      AutoCreatedQueueManagementPolicy policyClazz =
+          parentQueue.getAutoCreatedQueueManagementPolicy();
+      long startTime = 0;
+      try {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(MessageFormat
+              .format("Trying to use {0} to compute preemption "
+                      + "candidates",
+                  policyClazz.getClass().getName()));
+          startTime = clock.getTime();
+        }
+
+        queueManagementChanges = policyClazz.computeQueueManagementChanges();
+
+        //Scheduler update is asynchronous
+        if (queueManagementChanges.size() > 0) {
+          QueueManagementChangeEvent queueManagementChangeEvent =
+              new QueueManagementChangeEvent(parentQueue,
+                  queueManagementChanges);
+          scheduler.getRMContext().getDispatcher().getEventHandler().handle(
+              queueManagementChangeEvent);
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(MessageFormat.format("{0} uses {1} millisecond"
+                  + " to run",
+              policyClazz.getClass().getName(), clock.getTime()
+                  - startTime));
+          if (queueManagementChanges.size() > 0) {
+            LOG.debug(" Updated queue management updates for parent queue"
+                + " ["
+                + parentQueue.getQueueName() + ": [\n" + queueManagementChanges
+                .toString() + "\n]");
+          }
+        }
+      } catch (YarnException e) {
+        LOG.error(
+            "Could not compute child queue management updates for parent "
+                + "queue "
+                + parentQueue.getQueueName(), e);
+      }
+    } else{
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Skipping queue management updates for parent queue "
+                + parentQueue
+                .getQueuePath() + " "
+                + "since configuration for  auto creating queue's beyond "
+                + "parent's "
+                + "guaranteed capacity is disabled");
+      }
+    }
+    return queueManagementChanges;
+  }
+
+  @Override
+  public long getMonitoringInterval() {
+    return monitoringInterval;
+  }
+
+  @Override
+  public String getPolicyName() {
+    return "QueueManagementDynamicEditPolicy";
+  }
+
+  public ResourceCalculator getResourceCalculator() {
+    return rc;
+  }
+
+  public RMContext getRmContext() {
+    return rmContext;
+  }
+
+  public ResourceCalculator getRC() {
+    return rc;
+  }
+
+  public CapacityScheduler getScheduler() {
+    return scheduler;
+  }
+
+  public Set<String> getManagedParentQueues() {
+    return managedParentQueues;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
new file mode 100644
index 0000000..34f4aa1
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.io.IOException;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * This represents a dynamic {@link LeafQueue} managed by the
+ * {@link ReservationSystem}
+ *
+ */
+public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
+
+  private static final Logger LOG = LoggerFactory
+      .getLogger(ReservationQueue.class);
+
+  private PlanQueue parent;
+
+  public ReservationQueue(CapacitySchedulerContext cs, String queueName,
+      PlanQueue parent) throws IOException {
+    super(cs, queueName, parent, null);
+    // the following parameters are common to all reservation in the plan
+    updateQuotas(parent.getUserLimitForReservation(),
+        parent.getUserLimitFactor(),
+        parent.getMaxApplicationsForReservations(),
+        parent.getMaxApplicationsPerUserForReservation());
+    this.parent = parent;
+  }
+
+  @Override
+  public void reinitialize(CSQueue newlyParsedQueue,
+      Resource clusterResource) throws IOException {
+    try {
+      writeLock.lock();
+      // Sanity check
+      if (!(newlyParsedQueue instanceof ReservationQueue) || !newlyParsedQueue
+          .getQueuePath().equals(getQueuePath())) {
+        throw new IOException(
+            "Trying to reinitialize " + getQueuePath() + " from "
+                + newlyParsedQueue.getQueuePath());
+      }
+      super.reinitialize(newlyParsedQueue, clusterResource);
+      CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
+          this, labelManager, null);
+
+      updateQuotas(parent.getUserLimitForReservation(),
+          parent.getUserLimitFactor(),
+          parent.getMaxApplicationsForReservations(),
+          parent.getMaxApplicationsPerUserForReservation());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void updateQuotas(int userLimit, float userLimitFactor,
+      int maxAppsForReservation, int maxAppsPerUserForReservation) {
+    setUserLimit(userLimit);
+    setUserLimitFactor(userLimitFactor);
+    setMaxApplications(maxAppsForReservation);
+    maxApplicationsPerUser = maxAppsPerUserForReservation;
+  }
+
+  @Override
+  protected void setupConfigurableCapacities(CapacitySchedulerConfiguration
+      configuration) {
+    super.setupConfigurableCapacities(queueCapacities);
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
new file mode 100644
index 0000000..aee6405
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
@@ -0,0 +1,745 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .queuemanagement;
+
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .AbstractAutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .AutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .AutoCreatedLeafQueueConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .AutoCreatedQueueManagementPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .QueueManagementChange;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
+    .FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.MonotonicClock;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
+    .NO_LABEL;
+import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
+    .capacity.CSQueueUtils.EPSILON;
+
+/**
+ * Capacity Management policy for auto created leaf queues
+ * <p>
+ * Assigns capacity if available to leaf queues based on application
+ * submission order i.e leaf queues are assigned capacity in FCFS order based
+ * on application submission time.  Updates leaf queue capacities to 0 when
+ * there are no pending or running apps under that queue.
+ */
+public class GuaranteedOrZeroCapacityOverTimePolicy
+    implements AutoCreatedQueueManagementPolicy {
+
+  private CapacitySchedulerContext scheduler;
+  private ManagedParentQueue managedParentQueue;
+
+  private static final Log LOG = LogFactory.getLog(
+      GuaranteedOrZeroCapacityOverTimePolicy.class);
+
+  private AutoCreatedLeafQueueConfig ZERO_CAPACITY_ENTITLEMENT;
+
+  private ReentrantReadWriteLock.WriteLock writeLock;
+
+  private ReentrantReadWriteLock.ReadLock readLock;
+
+  private ParentQueueState parentQueueState = new ParentQueueState();
+
+  private AutoCreatedLeafQueueConfig leafQueueTemplate;
+
+  private QueueCapacities leafQueueTemplateCapacities;
+
+  private Map<String, LeafQueueState> leafQueueStateMap = new HashMap<>();
+
+  private Clock clock = new MonotonicClock();
+
+  private class LeafQueueState {
+
+    private AtomicBoolean isActive = new AtomicBoolean(false);
+
+    private long mostRecentActivationTime;
+
+    private long mostRecentDeactivationTime;
+
+    public long getMostRecentActivationTime() {
+      return mostRecentActivationTime;
+    }
+
+    public long getMostRecentDeactivationTime() {
+      return mostRecentDeactivationTime;
+    }
+
+    /**
+     * Is the queue currently active or deactivated?
+     *
+     * @return true if Active else false
+     */
+    public boolean isActive() {
+      return isActive.get();
+    }
+
+    private boolean activate() {
+      boolean ret = isActive.compareAndSet(false, true);
+      mostRecentActivationTime = clock.getTime();
+      return ret;
+    }
+
+    private boolean deactivate() {
+      boolean ret = isActive.compareAndSet(true, false);
+      mostRecentDeactivationTime = clock.getTime();
+      return ret;
+    }
+  }
+
+  private boolean containsLeafQueue(String leafQueueName) {
+    return leafQueueStateMap.containsKey(leafQueueName);
+  }
+
+  private boolean addLeafQueueStateIfNotExists(String leafQueueName,
+      LeafQueueState leafQueueState) {
+    if (!containsLeafQueue(leafQueueName)) {
+      leafQueueStateMap.put(leafQueueName, leafQueueState);
+      return true;
+    }
+    return false;
+  }
+
+  private boolean addLeafQueueStateIfNotExists(LeafQueue leafQueue) {
+    return addLeafQueueStateIfNotExists(leafQueue.getQueueName(),
+        new LeafQueueState());
+  }
+
+  private void clearLeafQueueState() {
+    leafQueueStateMap.clear();
+  }
+
+  private class ParentQueueState {
+
+    private Map<String, Float> totalAbsoluteActivatedChildQueueCapacityByLabel =
+        new HashMap<String, Float>();
+
+    private float getAbsoluteActivatedChildQueueCapacity() {
+      return getAbsoluteActivatedChildQueueCapacity(NO_LABEL);
+    }
+
+    private float getAbsoluteActivatedChildQueueCapacity(String nodeLabel) {
+      try {
+        readLock.lock();
+        Float totalActivatedCapacity = getByLabel(nodeLabel);
+        if (totalActivatedCapacity != null) {
+          return totalActivatedCapacity;
+        } else{
+          return 0;
+        }
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    private void incAbsoluteActivatedChildCapacity(String nodeLabel,
+        float childQueueCapacity) {
+      try {
+        writeLock.lock();
+        Float activatedChildCapacity = getByLabel(nodeLabel);
+        if (activatedChildCapacity != null) {
+          setByLabel(nodeLabel, activatedChildCapacity + childQueueCapacity);
+        } else{
+          setByLabel(nodeLabel, childQueueCapacity);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    private void decAbsoluteActivatedChildCapacity(String nodeLabel,
+        float childQueueCapacity) {
+      try {
+        writeLock.lock();
+        Float activatedChildCapacity = getByLabel(nodeLabel);
+        if (activatedChildCapacity != null) {
+          setByLabel(nodeLabel, activatedChildCapacity - childQueueCapacity);
+        } else{
+          setByLabel(nodeLabel, childQueueCapacity);
+        }
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    Float getByLabel(String label) {
+      return totalAbsoluteActivatedChildQueueCapacityByLabel.get(label);
+    }
+
+    Float setByLabel(String label, float val) {
+      return totalAbsoluteActivatedChildQueueCapacityByLabel.put(label, val);
+    }
+
+    void clear() {
+      totalAbsoluteActivatedChildQueueCapacityByLabel.clear();
+    }
+  }
+
+  /**
+   * Comparator that orders applications by their submit time
+   */
+  private class PendingApplicationComparator
+      implements Comparator<FiCaSchedulerApp> {
+
+    @Override
+    public int compare(FiCaSchedulerApp app1, FiCaSchedulerApp app2) {
+      RMApp rmApp1 = scheduler.getRMContext().getRMApps().get(
+          app1.getApplicationId());
+      RMApp rmApp2 = scheduler.getRMContext().getRMApps().get(
+          app2.getApplicationId());
+      if (rmApp1 != null && rmApp2 != null) {
+        return Long.compare(rmApp1.getSubmitTime(), rmApp2.getSubmitTime());
+      } else if (rmApp1 != null) {
+        return -1;
+      } else if (rmApp2 != null) {
+        return 1;
+      } else{
+        return 0;
+      }
+    }
+  }
+
+  private PendingApplicationComparator applicationComparator =
+      new PendingApplicationComparator();
+
+  @Override
+  public void init(final CapacitySchedulerContext schedulerContext,
+      final ParentQueue parentQueue) {
+    this.scheduler = schedulerContext;
+
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    readLock = lock.readLock();
+    writeLock = lock.writeLock();
+
+    if (!(parentQueue instanceof ManagedParentQueue)) {
+      throw new IllegalArgumentException(
+          "Expected instance of type " + ManagedParentQueue.class);
+    }
+
+    this.managedParentQueue = (ManagedParentQueue) parentQueue;
+
+    initializeLeafQueueTemplate(this.managedParentQueue);
+
+    LOG.info(
+        "Initialized queue management policy for parent queue " + parentQueue
+            .getQueueName() + " with leaf queue template capacities : ["
+            + leafQueueTemplate.getQueueCapacities() + "]");
+  }
+
+  private void initializeLeafQueueTemplate(ManagedParentQueue parentQueue) {
+    leafQueueTemplate = parentQueue.getLeafQueueTemplate();
+
+    leafQueueTemplateCapacities = leafQueueTemplate.getQueueCapacities();
+
+    ZERO_CAPACITY_ENTITLEMENT = buildTemplate(0.0f,
+        leafQueueTemplateCapacities.getMaximumCapacity());
+  }
+
+  @Override
+  public List<QueueManagementChange> computeQueueManagementChanges()
+      throws SchedulerDynamicEditException {
+
+    //TODO : Add support for node labels on leaf queue template configurations
+    //synch/add missing leaf queue(s) if any to state
+    updateLeafQueueState();
+
+    try {
+      readLock.lock();
+      List<QueueManagementChange> queueManagementChanges = new ArrayList<>();
+
+      // check if any leaf queues need to be deactivated based on pending
+      // applications and
+      float parentAbsoluteCapacity =
+          managedParentQueue.getQueueCapacities().getAbsoluteCapacity();
+
+      float leafQueueTemplateAbsoluteCapacity =
+          leafQueueTemplateCapacities.getAbsoluteCapacity();
+      Map<String, QueueCapacities> deactivatedLeafQueues =
+          deactivateLeafQueuesIfInActive(managedParentQueue, queueManagementChanges);
+
+      float deactivatedCapacity = getTotalDeactivatedCapacity(
+          deactivatedLeafQueues);
+
+      float sumOfChildQueueActivatedCapacity = parentQueueState.
+          getAbsoluteActivatedChildQueueCapacity();
+
+      //Check if we need to activate anything at all?
+      float availableCapacity = getAvailableCapacity(parentAbsoluteCapacity,
+          deactivatedCapacity, sumOfChildQueueActivatedCapacity);
+
+      if (LOG.isDebugEnabled()) {
+        LOG.debug(
+            "Parent queue : " + managedParentQueue.getQueueName() + " absCapacity = "
+                + parentAbsoluteCapacity + ", leafQueueAbsoluteCapacity = "
+                + leafQueueTemplateAbsoluteCapacity + ", deactivatedCapacity = "
+                + deactivatedCapacity + " , absChildActivatedCapacity = "
+                + sumOfChildQueueActivatedCapacity + ", availableCapacity = "
+                + availableCapacity);
+      }
+
+      if (availableCapacity >= leafQueueTemplateAbsoluteCapacity) {
+        //sort applications across leaf queues by submit time
+        List<FiCaSchedulerApp> pendingApps = getSortedPendingApplications();
+
+        if (pendingApps.size() > 0) {
+          int maxLeafQueuesTobeActivated = getMaxLeavesToBeActivated(
+              availableCapacity, leafQueueTemplateAbsoluteCapacity,
+              pendingApps.size());
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Found " + maxLeafQueuesTobeActivated
+                + " leaf queues to be activated with " + pendingApps.size()
+                + " apps ");
+          }
+
+          LinkedHashSet<String> leafQueuesToBeActivated = getSortedLeafQueues(
+              pendingApps, maxLeafQueuesTobeActivated,
+              deactivatedLeafQueues.keySet());
+
+          //Compute entitlement changes for the identified leaf queues
+          // which is appended to the List of queueManagementChanges
+          computeQueueManagementChanges(leafQueuesToBeActivated,
+              queueManagementChanges, availableCapacity,
+              leafQueueTemplateAbsoluteCapacity);
+
+          if (LOG.isDebugEnabled()) {
+            if (leafQueuesToBeActivated.size() > 0) {
+              LOG.debug(
+                  "Activated leaf queues : [" + leafQueuesToBeActivated + "]");
+            }
+          }
+        }
+      }
+      return queueManagementChanges;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private float getTotalDeactivatedCapacity(
+      Map<String, QueueCapacities> deactivatedLeafQueues) {
+    float deactivatedCapacity = 0;
+    for (Iterator<Map.Entry<String, QueueCapacities>> iterator =
+         deactivatedLeafQueues.entrySet().iterator(); iterator.hasNext(); ) {
+      Map.Entry<String, QueueCapacities> deactivatedQueueCapacity =
+          iterator.next();
+      deactivatedCapacity +=
+          deactivatedQueueCapacity.getValue().getAbsoluteCapacity();
+    }
+    return deactivatedCapacity;
+  }
+
+  @VisibleForTesting
+  void updateLeafQueueState() {
+    try {
+      writeLock.lock();
+      Set<String> newQueues = new HashSet<>();
+      for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
+        if (newQueue instanceof LeafQueue) {
+          addLeafQueueStateIfNotExists((LeafQueue) newQueue);
+          newQueues.add(newQueue.getQueueName());
+        }
+      }
+
+      for (Iterator<Map.Entry<String, LeafQueueState>> itr =
+           leafQueueStateMap.entrySet().iterator(); itr.hasNext(); ) {
+        Map.Entry<String, LeafQueueState> e = itr.next();
+        String queueName = e.getKey();
+        if (!newQueues.contains(queueName)) {
+          itr.remove();
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private LinkedHashSet<String> getSortedLeafQueues(
+      final List<FiCaSchedulerApp> pendingApps, int leafQueuesNeeded,
+      Set<String> deactivatedQueues) throws SchedulerDynamicEditException {
+
+    LinkedHashSet<String> leafQueues = new LinkedHashSet<>(leafQueuesNeeded);
+    int ctr = 0;
+    for (FiCaSchedulerApp app : pendingApps) {
+
+      AutoCreatedLeafQueue leafQueue =
+          (AutoCreatedLeafQueue) app.getCSLeafQueue();
+      String leafQueueName = leafQueue.getQueueName();
+
+      //Check if leafQueue is not active already and has any pending apps
+      if (ctr < leafQueuesNeeded) {
+
+        if (!isActive(leafQueue)) {
+          if (!deactivatedQueues.contains(leafQueueName)) {
+            if (addLeafQueueIfNotExists(leafQueues, leafQueueName)) {
+              ctr++;
+            }
+          }
+        }
+      } else{
+        break;
+      }
+    }
+    return leafQueues;
+  }
+
+  private boolean addLeafQueueIfNotExists(Set<String> leafQueues,
+      String leafQueueName) {
+    boolean ret = false;
+    if (!leafQueues.contains(leafQueueName)) {
+      ret = leafQueues.add(leafQueueName);
+    }
+    return ret;
+  }
+
+  @VisibleForTesting
+  public boolean isActive(final AutoCreatedLeafQueue leafQueue)
+      throws SchedulerDynamicEditException {
+    try {
+      readLock.lock();
+      LeafQueueState leafQueueStatus = getLeafQueueState(leafQueue);
+      return leafQueueStatus.isActive();
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Map<String, QueueCapacities> deactivateLeafQueuesIfInActive(
+      ParentQueue parentQueue,
+      List<QueueManagementChange> queueManagementChanges)
+      throws SchedulerDynamicEditException {
+    Map<String, QueueCapacities> deactivatedQueues = new HashMap<>();
+
+    for (CSQueue childQueue : parentQueue.getChildQueues()) {
+      AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
+
+      if (isActive(leafQueue) && !hasPendingApps(leafQueue)) {
+        queueManagementChanges.add(
+            new QueueManagementChange.UpdateQueue(leafQueue,
+                ZERO_CAPACITY_ENTITLEMENT));
+        deactivatedQueues.put(leafQueue.getQueueName(),
+            leafQueueTemplateCapacities);
+      } else{
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(" Leaf queue has pending applications :  " + leafQueue
+              .getNumApplications() + ".Skipping deactivation for "
+              + leafQueue);
+        }
+      }
+    }
+
+    if (LOG.isDebugEnabled()) {
+      if (deactivatedQueues.size() > 0) {
+        LOG.debug("Deactivated leaf queues : " + deactivatedQueues);
+      }
+    }
+    return deactivatedQueues;
+  }
+
+  private void computeQueueManagementChanges(
+      Set<String> leafQueuesToBeActivated,
+      List<QueueManagementChange> queueManagementChanges,
+      final float availableCapacity,
+      final float leafQueueTemplateAbsoluteCapacity) {
+
+    float curAvailableCapacity = availableCapacity;
+
+    for (String curLeafQueue : leafQueuesToBeActivated) {
+      // Activate queues if capacity is available
+      if (curAvailableCapacity >= leafQueueTemplateAbsoluteCapacity) {
+        AutoCreatedLeafQueue leafQueue =
+            (AutoCreatedLeafQueue) scheduler.getCapacitySchedulerQueueManager()
+                .getQueue(curLeafQueue);
+        if (leafQueue != null) {
+          AutoCreatedLeafQueueConfig newTemplate = buildTemplate(
+              leafQueueTemplateCapacities.getCapacity(),
+              leafQueueTemplateCapacities.getMaximumCapacity());
+          queueManagementChanges.add(
+              new QueueManagementChange.UpdateQueue(leafQueue, newTemplate));
+          curAvailableCapacity -= leafQueueTemplateAbsoluteCapacity;
+        } else{
+          LOG.warn(
+              "Could not find queue in scheduler while trying to deactivate "
+                  + curLeafQueue);
+        }
+      }
+    }
+  }
+
+  @VisibleForTesting
+  public int getMaxLeavesToBeActivated(float availableCapacity,
+      float childQueueAbsoluteCapacity, int numPendingApps)
+      throws SchedulerDynamicEditException {
+
+    if (childQueueAbsoluteCapacity > 0) {
+      int numLeafQueuesNeeded = (int) Math.floor(
+          availableCapacity / childQueueAbsoluteCapacity);
+
+      return Math.min(numLeafQueuesNeeded, numPendingApps);
+    } else{
+      throw new SchedulerDynamicEditException("Child queue absolute capacity "
+          + "is initialized to 0. Check parent queue's  " + managedParentQueue
+          .getQueueName() + " leaf queue template configuration");
+    }
+  }
+
+  private float getAvailableCapacity(float parentAbsCapacity,
+      float deactivatedAbsCapacity, float totalChildQueueActivatedCapacity) {
+    return parentAbsCapacity - totalChildQueueActivatedCapacity
+        + deactivatedAbsCapacity + EPSILON;
+  }
+
+  /**
+   * Commit queue management changes - which involves updating required state
+   * on parent/underlying leaf queues
+   *
+   * @param queueManagementChanges Queue Management changes to commit
+   * @throws SchedulerDynamicEditException when validation fails
+   */
+  @Override
+  public void commitQueueManagementChanges(
+      List<QueueManagementChange> queueManagementChanges)
+      throws SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+      for (QueueManagementChange queueManagementChange :
+          queueManagementChanges) {
+        AutoCreatedLeafQueueConfig updatedQueueTemplate =
+            queueManagementChange.getUpdatedQueueTemplate();
+        CSQueue queue = queueManagementChange.getQueue();
+        if (!(queue instanceof AutoCreatedLeafQueue)) {
+          throw new SchedulerDynamicEditException(
+              "Expected queue management change for AutoCreatedLeafQueue. "
+                  + "Found " + queue.getClass().getName());
+        }
+
+        AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) queue;
+
+        if (updatedQueueTemplate.getQueueCapacities().getCapacity() > 0) {
+          if (isActive(leafQueue)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Queue is already active. Skipping activation : " + queue
+                      .getQueuePath());
+            }
+          } else{
+            activate(leafQueue);
+          }
+        } else{
+          if (!isActive(leafQueue)) {
+            if (LOG.isDebugEnabled()) {
+              LOG.debug(
+                  "Queue is already de-activated. " + "Skipping de-activation "
+                      + ": " + leafQueue.getQueuePath());
+            }
+          } else{
+            deactivate(leafQueue);
+          }
+        }
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void activate(final AutoCreatedLeafQueue leafQueue)
+      throws SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+      getLeafQueueState(leafQueue).activate();
+
+      parentQueueState.incAbsoluteActivatedChildCapacity(NO_LABEL,
+          leafQueueTemplateCapacities.getAbsoluteCapacity());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  private void deactivate(final AutoCreatedLeafQueue leafQueue)
+      throws SchedulerDynamicEditException {
+    try {
+      writeLock.lock();
+      getLeafQueueState(leafQueue).deactivate();
+
+      for (String nodeLabel : managedParentQueue.getQueueCapacities()
+          .getExistingNodeLabels()) {
+        parentQueueState.decAbsoluteActivatedChildCapacity(nodeLabel,
+            leafQueueTemplateCapacities.getAbsoluteCapacity());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  public boolean hasPendingApps(final AutoCreatedLeafQueue leafQueue) {
+    return leafQueue.getNumApplications() > 0;
+  }
+
+  @Override
+  public void reinitialize(CapacitySchedulerContext schedulerContext,
+      final ParentQueue parentQueue) {
+    if (!(parentQueue instanceof ManagedParentQueue)) {
+      throw new IllegalStateException(
+          "Expected instance of type " + ManagedParentQueue.class + " found  "
+              + " : " + parentQueue.getClass());
+    }
+
+    if (this.managedParentQueue != null && !parentQueue.getQueuePath().equals(
+        this.managedParentQueue.getQueuePath())) {
+      throw new IllegalStateException(
+          "Expected parent queue path to match " + this.managedParentQueue
+              .getQueuePath() + " found : " + parentQueue.getQueuePath());
+    }
+
+    this.managedParentQueue = (ManagedParentQueue) parentQueue;
+
+    initializeLeafQueueTemplate(this.managedParentQueue);
+
+    //clear state
+    parentQueueState.clear();
+    clearLeafQueueState();
+
+    LOG.info(
+        "Reinitialized queue management policy for parent queue "
+            + parentQueue.getQueueName() +" with leaf queue template "
+            + "capacities : ["
+            + leafQueueTemplate.getQueueCapacities() + "]");
+  }
+
+  @Override
+  public AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration(
+      AbstractAutoCreatedLeafQueue leafQueue)
+      throws SchedulerDynamicEditException {
+
+    if ( !(leafQueue instanceof  AutoCreatedLeafQueue)) {
+      throw new SchedulerDynamicEditException("Not an instance of "
+          + "AutoCreatedLeafQueue : " + leafQueue.getClass());
+    }
+
+    AutoCreatedLeafQueue autoCreatedLeafQueue =
+        (AutoCreatedLeafQueue) leafQueue;
+    AutoCreatedLeafQueueConfig template = ZERO_CAPACITY_ENTITLEMENT;
+    try {
+      writeLock.lock();
+      if (!addLeafQueueStateIfNotExists(leafQueue)) {
+        LOG.error("Leaf queue already exists in state : " + getLeafQueueState(
+            leafQueue));
+        throw new SchedulerDynamicEditException(
+            "Leaf queue already exists in state : " + getLeafQueueState(
+                leafQueue));
+      }
+
+      float availableCapacity = getAvailableCapacity(
+          managedParentQueue.getQueueCapacities().getAbsoluteCapacity(), 0,
+          parentQueueState.getAbsoluteActivatedChildQueueCapacity());
+
+      if (availableCapacity >= leafQueueTemplateCapacities
+          .getAbsoluteCapacity()) {
+        activate(autoCreatedLeafQueue);
+        template = buildTemplate(leafQueueTemplateCapacities.getCapacity(),
+            leafQueueTemplateCapacities.getMaximumCapacity());
+      }
+    } finally {
+      writeLock.unlock();
+    }
+    return template;
+  }
+
+  @VisibleForTesting
+  LeafQueueState getLeafQueueState(LeafQueue queue)
+      throws SchedulerDynamicEditException {
+    try {
+      readLock.lock();
+      String queueName = queue.getQueueName();
+      if (!containsLeafQueue(queueName)) {
+        throw new SchedulerDynamicEditException(
+            "Could not find leaf queue in " + "state " + queueName);
+      } else{
+        return leafQueueStateMap.get(queueName);
+      }
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  @VisibleForTesting
+  public float getAbsoluteActivatedChildQueueCapacity() {
+    return parentQueueState.getAbsoluteActivatedChildQueueCapacity();
+  }
+
+  private List<FiCaSchedulerApp> getSortedPendingApplications() {
+    List<FiCaSchedulerApp> apps = new ArrayList<>(
+        managedParentQueue.getAllApplications());
+    Collections.sort(apps, applicationComparator);
+    return apps;
+  }
+
+  private AutoCreatedLeafQueueConfig buildTemplate(float capacity,
+      float maxCapacity) {
+    AutoCreatedLeafQueueConfig.Builder templateBuilder =
+        new AutoCreatedLeafQueueConfig.Builder();
+
+    QueueCapacities capacities = new QueueCapacities(false);
+    templateBuilder.capacities(capacities);
+
+    for (String nodeLabel : managedParentQueue.getQueueCapacities()
+        .getExistingNodeLabels()) {
+      capacities.setCapacity(nodeLabel, capacity);
+      capacities.setMaximumCapacity(nodeLabel, maxCapacity);
+    }
+
+    return new AutoCreatedLeafQueueConfig(templateBuilder);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
index 2a751e3..f4182f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/QueueEntitlement.java
@@ -43,4 +43,26 @@ public class QueueEntitlement {
   public void setCapacity(float capacity) {
     this.capacity = capacity;
   }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o)
+      return true;
+    if (!(o instanceof QueueEntitlement))
+      return false;
+
+    QueueEntitlement that = (QueueEntitlement) o;
+
+    if (Float.compare(that.capacity, capacity) != 0)
+      return false;
+    return Float.compare(that.maxCapacity, maxCapacity) == 0;
+  }
+
+  @Override
+  public int hashCode() {
+    int result = (capacity != +0.0f ? Float.floatToIntBits(capacity) : 0);
+    result = 31 * result + (maxCapacity != +0.0f ? Float.floatToIntBits(
+        maxCapacity) : 0);
+    return result;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java
new file mode 100644
index 0000000..926e1be
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/QueueManagementChangeEvent.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+    .QueueManagementChange;
+
+import java.util.List;
+
+/**
+ * Event to update scheduler of any queue management changes
+ */
+public class QueueManagementChangeEvent extends SchedulerEvent {
+
+  private ParentQueue parentQueue;
+  private List<QueueManagementChange> queueManagementChanges;
+
+  public QueueManagementChangeEvent(ParentQueue parentQueue,
+      List<QueueManagementChange> queueManagementChanges) {
+    super(SchedulerEventType.MANAGE_QUEUE);
+    this.parentQueue = parentQueue;
+    this.queueManagementChanges = queueManagementChanges;
+  }
+
+  public ParentQueue getParentQueue() {
+    return parentQueue;
+  }
+
+  public List<QueueManagementChange> getQueueManagementChanges() {
+    return queueManagementChanges;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
index 229e0bb..b107cf4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java
@@ -51,5 +51,8 @@ public enum SchedulerEventType {
   MARK_CONTAINER_FOR_KILLABLE,
 
   // Cancel a killable container
-  MARK_CONTAINER_FOR_NONKILLABLE
+  MARK_CONTAINER_FOR_NONKILLABLE,
+
+  //Queue Management Change
+  MANAGE_QUEUE
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
deleted file mode 100644
index b403e72..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAutoCreatedLeafQueue.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
-
-import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-import static org.junit.Assert.fail;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
-import java.io.IOException;
-
-import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.junit.Before;
-import org.junit.Test;
-
-/**
- * Test class for dynamic auto created leaf queues.
- * @see AutoCreatedLeafQueue
- */
-public class TestAutoCreatedLeafQueue {
-
-  private CapacitySchedulerConfiguration csConf;
-  private CapacitySchedulerContext csContext;
-  final static int DEF_MAX_APPS = 10000;
-  final static int GB = 1024;
-  private final ResourceCalculator resourceCalculator =
-      new DefaultResourceCalculator();
-  private AutoCreatedLeafQueue autoCreatedLeafQueue;
-
-  @Before
-  public void setup() throws IOException {
-    // setup a context / conf
-    csConf = new CapacitySchedulerConfiguration();
-    YarnConfiguration conf = new YarnConfiguration();
-    csContext = mock(CapacitySchedulerContext.class);
-    when(csContext.getConfiguration()).thenReturn(csConf);
-    when(csContext.getConf()).thenReturn(conf);
-    when(csContext.getMinimumResourceCapability()).thenReturn(
-        Resources.createResource(GB, 1));
-    when(csContext.getMaximumResourceCapability()).thenReturn(
-        Resources.createResource(16 * GB, 32));
-    when(csContext.getClusterResource()).thenReturn(
-        Resources.createResource(100 * 16 * GB, 100 * 32));
-    when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
-    RMContext mockRMContext = TestUtils.getMockRMContext();
-    when(csContext.getRMContext()).thenReturn(mockRMContext);
-
-    // create a queue
-    PlanQueue pq = new PlanQueue(csContext, "root", null, null);
-    autoCreatedLeafQueue = new AutoCreatedLeafQueue(csContext, "a", pq);
-  }
-
-  private void validateAutoCreatedLeafQueue(double capacity) {
-    assertTrue(" actual capacity: " + autoCreatedLeafQueue.getCapacity(),
-        autoCreatedLeafQueue.getCapacity() - capacity < CSQueueUtils.EPSILON);
-    assertEquals(autoCreatedLeafQueue.maxApplications, DEF_MAX_APPS);
-    assertEquals(autoCreatedLeafQueue.maxApplicationsPerUser, DEF_MAX_APPS);
-  }
-
-  @Test
-  public void testAddSubtractCapacity() throws Exception {
-
-    // verify that setting, adding, subtracting capacity works
-    autoCreatedLeafQueue.setCapacity(1.0F);
-    validateAutoCreatedLeafQueue(1);
-    autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
-    validateAutoCreatedLeafQueue(0.9);
-    autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1f, 1f));
-    validateAutoCreatedLeafQueue(1);
-    autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0f, 1f));
-    validateAutoCreatedLeafQueue(0);
-
-    try {
-      autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(1.1f, 1f));
-      fail();
-    } catch (SchedulerDynamicEditException iae) {
-      // expected
-      validateAutoCreatedLeafQueue(1);
-    }
-
-    try {
-      autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(-0.1f, 1f));
-      fail();
-    } catch (SchedulerDynamicEditException iae) {
-      // expected
-      validateAutoCreatedLeafQueue(1);
-    }
-
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org