You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by xy...@apache.org on 2018/07/02 20:32:39 UTC

[22/45] hadoop git commit: YARN-8379. Improve balancing resources in already satisfied queues by using Capacity Scheduler preemption. Contributed by Zian Chen.

YARN-8379. Improve balancing resources in already satisfied queues by using Capacity Scheduler preemption. Contributed by Zian Chen.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/29119430
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/29119430
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/29119430

Branch: refs/heads/HDDS-4
Commit: 291194302cc1a875d6d94ea93cf1184a3f1fc2cc
Parents: 384764c
Author: Sunil G <su...@apache.org>
Authored: Thu Jun 28 10:23:31 2018 -0700
Committer: Sunil G <su...@apache.org>
Committed: Thu Jun 28 10:23:31 2018 -0700

----------------------------------------------------------------------
 .../AbstractPreemptableResourceCalculator.java  |  21 +-
 .../CapacitySchedulerPreemptionContext.java     |   2 +
 .../CapacitySchedulerPreemptionUtils.java       |  23 +-
 .../capacity/FifoCandidatesSelector.java        |  45 ++--
 .../capacity/IntraQueueCandidatesSelector.java  |   9 +-
 .../capacity/PreemptableResourceCalculator.java |   7 +-
 .../capacity/PreemptionCandidatesSelector.java  |  11 +
 .../ProportionalCapacityPreemptionPolicy.java   | 129 +++++++---
 ...QueuePriorityContainerCandidateSelector.java |  16 +-
 .../ReservedContainerCandidatesSelector.java    |  16 +-
 .../monitor/capacity/TempQueuePerPartition.java |   8 +-
 .../CapacitySchedulerConfiguration.java         |  17 ++
 .../TestPreemptionForQueueWithPriorities.java   |  58 +++++
 ...apacityPreemptionPolicyPreemptToBalance.java | 254 +++++++++++++++++++
 ...TestCapacitySchedulerSurgicalPreemption.java | 111 ++++++++
 15 files changed, 637 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
index 64b3615..5b8360a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -42,6 +42,7 @@ public class AbstractPreemptableResourceCalculator {
   protected final ResourceCalculator rc;
   protected boolean isReservedPreemptionCandidatesSelector;
   private Resource stepFactor;
+  private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
 
   static class TQComparator implements Comparator<TempQueuePerPartition> {
     private ResourceCalculator rc;
@@ -83,15 +84,28 @@ public class AbstractPreemptableResourceCalculator {
    *          this will be set by different implementation of candidate
    *          selectors, please refer to TempQueuePerPartition#offer for
    *          details.
+   * @param allowQueuesBalanceAfterAllQueuesSatisfied
+   *          Should resources be preempted from an over-served queue when the
+   *          requesting queues are all at or over their guarantees?
+   *          An example is, there're 10 queues under root, guaranteed resource
+   *          of them are all 10%.
+   *          Assume there're two queues are using resources, queueA uses 10%
+   *          queueB uses 90%. For all queues are guaranteed, but it's not fair
+   *          for queueA.
+   *          We wanna make this behavior can be configured. By default it is
+   *          not allowed.
+   *
    */
   public AbstractPreemptableResourceCalculator(
       CapacitySchedulerPreemptionContext preemptionContext,
-      boolean isReservedPreemptionCandidatesSelector) {
+      boolean isReservedPreemptionCandidatesSelector,
+      boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
     context = preemptionContext;
     rc = preemptionContext.getResourceCalculator();
     this.isReservedPreemptionCandidatesSelector =
         isReservedPreemptionCandidatesSelector;
-
+    this.allowQueuesBalanceAfterAllQueuesSatisfied =
+        allowQueuesBalanceAfterAllQueuesSatisfied;
     stepFactor = Resource.newInstance(0, 0);
     for (ResourceInformation ri : stepFactor.getResources()) {
       ri.setValue(1);
@@ -193,7 +207,8 @@ public class AbstractPreemptableResourceCalculator {
         wQavail = Resources.componentwiseMin(wQavail, unassigned);
 
         Resource wQidle = sub.offer(wQavail, rc, totGuarant,
-            isReservedPreemptionCandidatesSelector);
+            isReservedPreemptionCandidatesSelector,
+            allowQueuesBalanceAfterAllQueuesSatisfied);
         Resource wQdone = Resources.subtract(wQavail, wQidle);
 
         if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index 098acdd..7985296 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -70,6 +70,8 @@ public interface CapacitySchedulerPreemptionContext {
 
   float getMaxAllowableLimitForIntraQueuePreemption();
 
+  long getDefaultMaximumKillWaitTimeout();
+
   @Unstable
   IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 690eb02..ed50eff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -151,6 +151,7 @@ public class CapacitySchedulerPreemptionUtils {
       Map<String, Resource> resourceToObtainByPartitions,
       RMContainer rmContainer, Resource clusterResource,
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       Resource totalPreemptionAllowed, boolean conservativeDRF) {
     ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
 
@@ -218,7 +219,7 @@ public class CapacitySchedulerPreemptionUtils {
       }
 
       // Add to preemptMap
-      addToPreemptMap(preemptMap, attemptId, rmContainer);
+      addToPreemptMap(preemptMap, curCandidates, attemptId, rmContainer);
       return true;
     }
 
@@ -230,15 +231,23 @@ public class CapacitySchedulerPreemptionUtils {
     return context.getScheduler().getSchedulerNode(nodeId).getPartition();
   }
 
-  private static void addToPreemptMap(
+  protected static void addToPreemptMap(
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
-    Set<RMContainer> set = preemptMap.get(appAttemptId);
-    if (null == set) {
-      set = new HashSet<>();
-      preemptMap.put(appAttemptId, set);
+    Set<RMContainer> setForToPreempt = preemptMap.get(appAttemptId);
+    Set<RMContainer> setForCurCandidates = curCandidates.get(appAttemptId);
+    if (null == setForToPreempt) {
+      setForToPreempt = new HashSet<>();
+      preemptMap.put(appAttemptId, setForToPreempt);
     }
-    set.add(containerToPreempt);
+    setForToPreempt.add(containerToPreempt);
+
+    if (null == setForCurCandidates) {
+      setForCurCandidates = new HashSet<>();
+      curCandidates.put(appAttemptId, setForCurCandidates);
+    }
+    setForCurCandidates.add(containerToPreempt);
   }
 
   private static boolean preemptMapContains(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index 3b2fcbb..c2735f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -42,19 +43,25 @@ public class FifoCandidatesSelector
   private static final Log LOG =
       LogFactory.getLog(FifoCandidatesSelector.class);
   private PreemptableResourceCalculator preemptableAmountCalculator;
+  private boolean allowQueuesBalanceAfterAllQueuesSatisfied;
 
   FifoCandidatesSelector(CapacitySchedulerPreemptionContext preemptionContext,
-      boolean includeReservedResource) {
+      boolean includeReservedResource,
+      boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
     super(preemptionContext);
 
+    this.allowQueuesBalanceAfterAllQueuesSatisfied =
+        allowQueuesBalanceAfterAllQueuesSatisfied;
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext, includeReservedResource);
+        preemptionContext, includeReservedResource,
+        allowQueuesBalanceAfterAllQueuesSatisfied);
   }
 
   @Override
   public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptionAllowed) {
+    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
     // Calculate how much resources we need to preempt
     preemptableAmountCalculator.computeIdealAllocation(clusterResource,
         totalPreemptionAllowed);
@@ -110,7 +117,7 @@ public class FifoCandidatesSelector
               boolean preempted = CapacitySchedulerPreemptionUtils
                   .tryPreemptContainerAndDeductResToObtain(rc,
                       preemptionContext, resToObtainByPartition, c,
-                      clusterResource, selectedCandidates,
+                      clusterResource, selectedCandidates, curCandidates,
                       totalPreemptionAllowed, false);
               if (!preempted) {
                 continue;
@@ -134,7 +141,7 @@ public class FifoCandidatesSelector
 
           preemptFrom(fc, clusterResource, resToObtainByPartition,
               skippedAMContainerlist, skippedAMSize, selectedCandidates,
-              totalPreemptionAllowed);
+              curCandidates, totalPreemptionAllowed);
         }
 
         // Can try preempting AMContainers (still saving atmost
@@ -145,15 +152,15 @@ public class FifoCandidatesSelector
                 leafQueue.getEffectiveCapacity(RMNodeLabelsManager.NO_LABEL),
                 leafQueue.getMaxAMResourcePerQueuePercent());
 
-        preemptAMContainers(clusterResource, selectedCandidates, skippedAMContainerlist,
-            resToObtainByPartition, skippedAMSize, maxAMCapacityForThisQueue,
-            totalPreemptionAllowed);
+        preemptAMContainers(clusterResource, selectedCandidates, curCandidates,
+            skippedAMContainerlist, resToObtainByPartition, skippedAMSize,
+            maxAMCapacityForThisQueue, totalPreemptionAllowed);
       } finally {
         leafQueue.getReadLock().unlock();
       }
     }
 
-    return selectedCandidates;
+    return curCandidates;
   }
 
   /**
@@ -169,6 +176,7 @@ public class FifoCandidatesSelector
    */
   private void preemptAMContainers(Resource clusterResource,
       Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       List<RMContainer> skippedAMContainerlist,
       Map<String, Resource> resToObtainByPartition, Resource skippedAMSize,
       Resource maxAMCapacityForThisQueue, Resource totalPreemptionAllowed) {
@@ -187,7 +195,7 @@ public class FifoCandidatesSelector
       boolean preempted = CapacitySchedulerPreemptionUtils
           .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
               resToObtainByPartition, c, clusterResource, preemptMap,
-              totalPreemptionAllowed, false);
+              curCandidates, totalPreemptionAllowed, false);
       if (preempted) {
         Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
       }
@@ -203,6 +211,7 @@ public class FifoCandidatesSelector
       Resource clusterResource, Map<String, Resource> resToObtainByPartition,
       List<RMContainer> skippedAMContainerlist, Resource skippedAMSize,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       Resource totalPreemptionAllowed) {
     ApplicationAttemptId appId = app.getApplicationAttemptId();
 
@@ -219,9 +228,10 @@ public class FifoCandidatesSelector
       }
 
       // Try to preempt this container
-      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
-          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
-          selectedContainers, totalPreemptionAllowed, false);
+     CapacitySchedulerPreemptionUtils
+          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+              resToObtainByPartition, c, clusterResource, selectedContainers,
+              curCandidates, totalPreemptionAllowed, false);
 
       if (!preemptionContext.isObserveOnly()) {
         preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -262,9 +272,14 @@ public class FifoCandidatesSelector
       }
 
       // Try to preempt this container
-      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
-          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
-          selectedContainers, totalPreemptionAllowed, false);
+      CapacitySchedulerPreemptionUtils
+          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+              resToObtainByPartition, c, clusterResource, selectedContainers,
+              curCandidates, totalPreemptionAllowed, false);
     }
   }
+
+  public boolean getAllowQueuesBalanceAfterAllQueuesSatisfied() {
+    return allowQueuesBalanceAfterAllQueuesSatisfied;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index 8ab9507..c52fd95 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -122,7 +122,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
   public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed) {
-
+    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
     // 1. Calculate the abnormality within each queue one by one.
     computeIntraQueuePreemptionDemand(
         clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
@@ -182,7 +182,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
           leafQueue.getReadLock().lock();
           for (FiCaSchedulerApp app : apps) {
             preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
-                clusterResource, totalPreemptedResourceAllowed,
+                curCandidates, clusterResource, totalPreemptedResourceAllowed,
                 resToObtainByPartition, rollingResourceUsagePerUser);
           }
         } finally {
@@ -191,7 +191,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
       }
     }
 
-    return selectedCandidates;
+    return curCandidates;
   }
 
   private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
@@ -211,6 +211,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
   private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
       FiCaSchedulerApp app,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed,
       Map<String, Resource> resToObtainByPartition,
       Map<String, Resource> rollingResourceUsagePerUser) {
@@ -270,7 +271,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
       boolean ret = CapacitySchedulerPreemptionUtils
           .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
               resToObtainByPartition, c, clusterResource, selectedCandidates,
-              totalPreemptedResourceAllowed, true);
+              curCandidates, totalPreemptedResourceAllowed, true);
 
       // Subtract from respective user's resource usage once a container is
       // selected for preemption.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 08d834e..89a015e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -48,11 +48,14 @@ public class PreemptableResourceCalculator
    * @param isReservedPreemptionCandidatesSelector this will be set by
    * different implementation of candidate selectors, please refer to
    * TempQueuePerPartition#offer for details.
+   * @param allowQueuesBalanceAfterAllQueuesSatisfied
    */
   public PreemptableResourceCalculator(
       CapacitySchedulerPreemptionContext preemptionContext,
-      boolean isReservedPreemptionCandidatesSelector) {
-    super(preemptionContext, isReservedPreemptionCandidatesSelector);
+      boolean isReservedPreemptionCandidatesSelector,
+      boolean allowQueuesBalanceAfterAllQueuesSatisfied) {
+    super(preemptionContext, isReservedPreemptionCandidatesSelector,
+        allowQueuesBalanceAfterAllQueuesSatisfied);
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index 4d8afaf..3c97364 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -34,6 +34,7 @@ import java.util.Set;
 public abstract class PreemptionCandidatesSelector {
   protected CapacitySchedulerPreemptionContext preemptionContext;
   protected ResourceCalculator rc;
+  private long maximumKillWaitTime = -1;
 
   PreemptionCandidatesSelector(
       CapacitySchedulerPreemptionContext preemptionContext) {
@@ -77,4 +78,14 @@ public abstract class PreemptionCandidatesSelector {
     });
   }
 
+  public long getMaximumKillWaitTimeMs() {
+    if (maximumKillWaitTime > 0) {
+      return maximumKillWaitTime;
+    }
+    return preemptionContext.getDefaultMaximumKillWaitTimeout();
+  }
+
+  public void setMaximumKillWaitTime(long maximumKillWaitTime) {
+    this.maximumKillWaitTime = maximumKillWaitTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index cc69fba..036fd2f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -131,6 +131,8 @@ public class ProportionalCapacityPreemptionPolicy
   private List<PreemptionCandidatesSelector> candidatesSelectionPolicies;
   private Set<String> allPartitions;
   private Set<String> leafQueueNames;
+  Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+      Set<RMContainer>>> pcsMap;
 
   // Preemptable Entities, synced from scheduler at every run
   private Map<String, PreemptableQueue> preemptableQueues;
@@ -249,7 +251,21 @@ public class ProportionalCapacityPreemptionPolicy
 
     // initialize candidates preemption selection policies
     candidatesSelectionPolicies.add(new FifoCandidatesSelector(this,
-        additionalPreemptionBasedOnReservedResource));
+        additionalPreemptionBasedOnReservedResource, false));
+
+    // Do we need to do preemption to balance queue even after queues get satisfied?
+    boolean isPreemptionToBalanceRequired = config.getBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED);
+    long maximumKillWaitTimeForPreemptionToQueueBalance = config.getLong(
+        CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+        CapacitySchedulerConfiguration.DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION);
+    if (isPreemptionToBalanceRequired) {
+      PreemptionCandidatesSelector selector = new FifoCandidatesSelector(this,
+          false, true);
+      selector.setMaximumKillWaitTime(maximumKillWaitTimeForPreemptionToQueueBalance);
+      candidatesSelectionPolicies.add(selector);
+    }
 
     // Do we need to specially consider intra queue
     boolean isIntraQueuePreemptionEnabled = config.getBoolean(
@@ -282,7 +298,8 @@ public class ProportionalCapacityPreemptionPolicy
         "select_based_on_reserved_containers = " +
           selectCandidatesForResevedContainers + "\n" +
         "additional_res_balance_based_on_reserved_containers = " +
-          additionalPreemptionBasedOnReservedResource);
+          additionalPreemptionBasedOnReservedResource + "\n" +
+        "Preemption-to-balance-queue-enabled = " + isPreemptionToBalanceRequired);
 
     csConfig = config;
   }
@@ -308,44 +325,60 @@ public class ProportionalCapacityPreemptionPolicy
   }
 
   private void preemptOrkillSelectedContainerAfterWait(
-      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
-      long currentTime) {
+      Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+          Set<RMContainer>>> toPreemptPerSelector, long currentTime) {
+    int toPreemptCount = 0;
+    for (Map<ApplicationAttemptId, Set<RMContainer>> containers :
+        toPreemptPerSelector.values()) {
+      toPreemptCount += containers.size();
+    }
     if (LOG.isDebugEnabled()) {
       LOG.debug(
           "Starting to preempt containers for selectedCandidates and size:"
-              + selectedCandidates.size());
+              + toPreemptCount);
     }
 
     // preempt (or kill) the selected containers
-    for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
+    // We need toPreemptPerSelector here to match list of containers to
+    // its selector so that we can get custom timeout per selector when
+    // checking if current container should be killed or not
+    for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+        Set<RMContainer>>> pc : toPreemptPerSelector
         .entrySet()) {
-      ApplicationAttemptId appAttemptId = e.getKey();
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Send to scheduler: in app=" + appAttemptId
-            + " #containers-to-be-preemptionCandidates=" + e.getValue().size());
-      }
-      for (RMContainer container : e.getValue()) {
-        // if we tried to preempt this for more than maxWaitTime
-        if (preemptionCandidates.get(container) != null
-            && preemptionCandidates.get(container)
-                + maxWaitTime <= currentTime) {
-          // kill it
-          rmContext.getDispatcher().getEventHandler().handle(
-              new ContainerPreemptEvent(appAttemptId, container,
-                  SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
-          preemptionCandidates.remove(container);
-        } else {
-          if (preemptionCandidates.get(container) != null) {
-            // We already updated the information to scheduler earlier, we need
-            // not have to raise another event.
-            continue;
+      Map<ApplicationAttemptId, Set<RMContainer>> cMap = pc.getValue();
+      if (cMap.size() > 0) {
+        for (Map.Entry<ApplicationAttemptId,
+            Set<RMContainer>> e : cMap.entrySet()) {
+          ApplicationAttemptId appAttemptId = e.getKey();
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Send to scheduler: in app=" + appAttemptId
+                + " #containers-to-be-preemptionCandidates=" + e.getValue().size());
+          }
+          for (RMContainer container : e.getValue()) {
+            // if we tried to preempt this for more than maxWaitTime, this
+            // should be based on custom timeout per container per selector
+            if (preemptionCandidates.get(container) != null
+                && preemptionCandidates.get(container)
+                + pc.getKey().getMaximumKillWaitTimeMs() <= currentTime) {
+              // kill it
+              rmContext.getDispatcher().getEventHandler().handle(
+                  new ContainerPreemptEvent(appAttemptId, container,
+                      SchedulerEventType.MARK_CONTAINER_FOR_KILLABLE));
+              preemptionCandidates.remove(container);
+            } else {
+              if (preemptionCandidates.get(container) != null) {
+                // We already updated the information to scheduler earlier, we need
+                // not have to raise another event.
+                continue;
+              }
+
+              //otherwise just send preemption events
+              rmContext.getDispatcher().getEventHandler().handle(
+                  new ContainerPreemptEvent(appAttemptId, container,
+                      SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
+              preemptionCandidates.put(container, currentTime);
+            }
           }
-
-          //otherwise just send preemption events
-          rmContext.getDispatcher().getEventHandler().handle(
-              new ContainerPreemptEvent(appAttemptId, container,
-                  SchedulerEventType.MARK_CONTAINER_FOR_PREEMPTION));
-          preemptionCandidates.put(container, currentTime);
         }
       }
     }
@@ -438,6 +471,8 @@ public class ProportionalCapacityPreemptionPolicy
     // queue and each application
     Map<ApplicationAttemptId, Set<RMContainer>> toPreempt =
         new HashMap<>();
+    Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+        Set<RMContainer>>> toPreemptPerSelector =  new HashMap<>();;
     for (PreemptionCandidatesSelector selector :
         candidatesSelectionPolicies) {
       long startTime = 0;
@@ -447,20 +482,27 @@ public class ProportionalCapacityPreemptionPolicy
                 selector.getClass().getName()));
         startTime = clock.getTime();
       }
-      toPreempt = selector.selectCandidates(toPreempt,
-          clusterResources, totalPreemptionAllowed);
+      Map<ApplicationAttemptId, Set<RMContainer>> curCandidates =
+          selector.selectCandidates(toPreempt, clusterResources,
+              totalPreemptionAllowed);
+      toPreemptPerSelector.putIfAbsent(selector, curCandidates);
 
       if (LOG.isDebugEnabled()) {
         LOG.debug(MessageFormat
             .format("{0} uses {1} millisecond to run",
                 selector.getClass().getName(), clock.getTime() - startTime));
         int totalSelected = 0;
+        int curSelected = 0;
         for (Set<RMContainer> set : toPreempt.values()) {
           totalSelected += set.size();
         }
+        for (Set<RMContainer> set : curCandidates.values()) {
+          curSelected += set.size();
+        }
         LOG.debug(MessageFormat
-            .format("So far, total {0} containers selected to be preempted",
-                totalSelected));
+            .format("So far, total {0} containers selected to be preempted, {1}"
+                    + " containers selected this round\n",
+                totalSelected, curSelected));
       }
     }
 
@@ -483,8 +525,10 @@ public class ProportionalCapacityPreemptionPolicy
 
     long currentTime = clock.getTime();
 
+    pcsMap = toPreemptPerSelector;
+
     // preempt (or kill) the selected containers
-    preemptOrkillSelectedContainerAfterWait(toPreempt, currentTime);
+    preemptOrkillSelectedContainerAfterWait(toPreemptPerSelector, currentTime);
 
     // cleanup staled preemption candidates
     cleanupStaledPreemptionCandidates(currentTime);
@@ -689,6 +733,12 @@ public class ProportionalCapacityPreemptionPolicy
     return queueToPartitions;
   }
 
+  @VisibleForTesting
+  Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+      Set<RMContainer>>> getToPreemptCandidatesPerSelector() {
+    return pcsMap;
+  }
+
   @Override
   public int getClusterMaxApplicationPriority() {
     return scheduler.getMaxClusterLevelAppPriority().getPriority();
@@ -730,4 +780,9 @@ public class ProportionalCapacityPreemptionPolicy
   public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
     return intraQueuePreemptionOrderPolicy;
   }
+
+  @Override
+  public long getDefaultMaximumKillWaitTimeout() {
+    return maxWaitTime;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
index 4a169af..78a9988 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/QueuePriorityContainerCandidateSelector.java
@@ -380,6 +380,7 @@ public class QueuePriorityContainerCandidateSelector
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource,
       Resource totalPreemptedResourceAllowed) {
+    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
     // Initialize digraph from queues
     // TODO (wangda): only do this when queue refreshed.
     priorityDigraph.clear();
@@ -388,7 +389,7 @@ public class QueuePriorityContainerCandidateSelector
     // When all queues are set to same priority, or priority is not respected,
     // direct return.
     if (priorityDigraph.isEmpty()) {
-      return selectedCandidates;
+      return curCandidates;
     }
 
     // Save parameters to be shared by other methods
@@ -478,13 +479,9 @@ public class QueuePriorityContainerCandidateSelector
                 .getReservedResource());
           }
 
-          Set<RMContainer> containers = selectedCandidates.get(
-              c.getApplicationAttemptId());
-          if (null == containers) {
-            containers = new HashSet<>();
-            selectedCandidates.put(c.getApplicationAttemptId(), containers);
-          }
-          containers.add(c);
+          // Add to preemptMap
+          CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
+              curCandidates, c.getApplicationAttemptId(), c);
 
           // Update totalPreemptionResourceAllowed
           Resources.subtractFrom(totalPreemptedResourceAllowed,
@@ -504,7 +501,6 @@ public class QueuePriorityContainerCandidateSelector
         }
       }
     }
-
-    return selectedCandidates;
+    return curCandidates;
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
index ff100d9..bdb7e8c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ReservedContainerCandidatesSelector.java
@@ -31,7 +31,6 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -63,7 +62,7 @@ public class ReservedContainerCandidatesSelector
       CapacitySchedulerPreemptionContext preemptionContext) {
     super(preemptionContext);
     preemptableAmountCalculator = new PreemptableResourceCalculator(
-        preemptionContext, true);
+        preemptionContext, true, false);
   }
 
   @Override
@@ -71,6 +70,7 @@ public class ReservedContainerCandidatesSelector
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource,
       Resource totalPreemptedResourceAllowed) {
+    Map<ApplicationAttemptId, Set<RMContainer>> curCandidates = new HashMap<>();
     // Calculate how much resources we need to preempt
     preemptableAmountCalculator.computeIdealAllocation(clusterResource,
         totalPreemptedResourceAllowed);
@@ -101,14 +101,10 @@ public class ReservedContainerCandidatesSelector
           selectedCandidates, totalPreemptedResourceAllowed, false);
       if (null != preemptionResult) {
         for (RMContainer c : preemptionResult.selectedContainers) {
-          ApplicationAttemptId appId = c.getApplicationAttemptId();
-          Set<RMContainer> containers = selectedCandidates.get(appId);
-          if (null == containers) {
-            containers = new HashSet<>();
-            selectedCandidates.put(appId, containers);
-          }
+          // Add to preemptMap
+          CapacitySchedulerPreemptionUtils.addToPreemptMap(selectedCandidates,
+              curCandidates, c.getApplicationAttemptId(), c);
 
-          containers.add(c);
           if (LOG.isDebugEnabled()) {
             LOG.debug(this.getClass().getName() + " Marked container=" + c
                 .getContainerId() + " from queue=" + c.getQueueName()
@@ -118,7 +114,7 @@ public class ReservedContainerCandidatesSelector
       }
     }
 
-    return selectedCandidates;
+    return curCandidates;
   }
 
   private Resource getPreemptableResource(String queueName,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 4214acc..4fb1862 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -138,7 +138,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   // This function "accepts" all the resources it can (pending) and return
   // the unused ones
   Resource offer(Resource avail, ResourceCalculator rc,
-      Resource clusterResource, boolean considersReservedResource) {
+      Resource clusterResource, boolean considersReservedResource,
+      boolean allowQueueBalanceAfterAllSafisfied) {
     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
         Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
@@ -179,7 +180,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     // leaf queues. Such under-utilized leaf queue could preemption resources
     // from over-utilized leaf queue located at other hierarchies.
 
-    accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+    // Allow queues can continue grow and balance even if all queues are satisfied.
+    if (!allowQueueBalanceAfterAllSafisfied) {
+      accepted = filterByMaxDeductAssigned(rc, clusterResource, accepted);
+    }
 
     // accepted so far contains the "quota acceptable" amount, we now filter by
     // locality acceptable

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 76eaac0..f94654e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1460,6 +1460,23 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
 
   /**
+   * Should we allow queues continue grow after all queue reaches their
+   * guaranteed capacity.
+   */
+  public static final String PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED =
+      PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.enabled";
+  public static final boolean DEFAULT_PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED = false;
+
+  /**
+   * How long we will wait to balance queues, by default it is 5 mins.
+   */
+  public static final String MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
+      PREEMPTION_CONFIG_PREFIX + "preemption-to-balance-queue-after-satisfied.max-wait-before-kill";
+  public static final long
+      DEFAULT_MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION =
+      300 * 1000;
+
+  /**
    * Maximum application for a queue to be used when application per queue is
    * not defined.To be consistent with previous version the default value is set
    * as UNDEFINED.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
index 6a953cf..38c2a2a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestPreemptionForQueueWithPriorities.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
 import org.apache.hadoop.yarn.api.records.ResourceInformation;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -538,4 +539,61 @@ public class TestPreemptionForQueueWithPriorities
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(1))));
   }
+
+  @Test
+  public void testPriorityPreemptionForBalanceBetweenSatisfiedQueues()
+      throws IOException {
+    /**
+     * All queues are beyond guarantee, c has higher priority than b.
+     * c ask for more resource, and there is no idle left, c should preempt
+     * some resource from b but won’t let b under its guarantee.
+     *
+     * Queue structure is:
+     *
+     * <pre>
+     *        root
+     *       / |  \
+     *      a  b   c
+     * </pre>
+     *
+     * For priorities
+     * - a=1
+     * - b=1
+     * - c=2
+     *
+     */
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 0 0]){priority=1};" + // a
+            "-b(=[30 100 40 50]){priority=1};" + // b
+            "-c(=[40 100 60 25]){priority=2}";   // c
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "b\t(1,1,n1,,40,false);" + // app1 in b
+            "c\t(1,1,n1,,60,false)"; // app2 in c
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    CapacitySchedulerConfiguration newConf =
+        new CapacitySchedulerConfiguration(conf);
+    boolean isPreemptionToBalanceRequired = true;
+    newConf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        isPreemptionToBalanceRequired);
+    when(cs.getConfiguration()).thenReturn(newConf);
+    policy.editSchedule();
+
+    // IdealAssigned b: 30 c: 70. initIdealAssigned: b: 30 c: 40, even though
+    // b and c has same relativeAssigned=1.0f(idealAssigned / guaranteed),
+    // since c has higher priority, c will be put in mostUnderServedQueue and
+    // get all remain 30 capacity.
+    verify(mDisp, times(10)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
new file mode 100644
index 0000000..22e8f63
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyPreemptToBalance.java
@@ -0,0 +1,254 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.junit.Test;
+import java.io.IOException;
+import java.util.Map;
+import java.util.Set;
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+public class TestProportionalCapacityPreemptionPolicyPreemptToBalance
+    extends ProportionalCapacityPreemptionPolicyMockFramework {
+
+  @Test
+  public void testPreemptionToBalanceDisabled() throws IOException {
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 10 30]);" + // a
+            "-b(=[30 100 40 30]);" + // b
+            "-c(=[30 100 50 30]);" + // c
+            "-d(=[10 100 0 0])";   // d
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,10,false);" + // app1 in a
+            "b\t(1,1,n1,,40,false);" + // app2 in b
+            "c\t(1,1,n1,,50,false)"; // app3 in c
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // I_A: A:30 B:35 C:35, preempt 5 from B and 15 from C to A
+    verify(mDisp, times(5)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+
+    assertEquals(30, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(35, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(35, policy.getQueuePartitions().get("c")
+        .get("").getIdealAssigned().getMemorySize());
+  }
+
+  @Test
+  public void testPreemptionToBalanceEnabled() throws IOException {
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 10 30]);" + // a
+            "-b(=[30 100 40 30]);" + // b
+            "-c(=[30 100 50 30]);" + // c
+            "-d(=[10 100 0 0])";   // d
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,10,false);" + // app1 in a
+            "b\t(1,1,n1,,40,false);" + // app2 in b
+            "c\t(1,1,n1,,50,false)"; // app3 in c
+
+    // enable preempt to balance and ideal assignment will change.
+    boolean isPreemptionToBalanceEnabled = true;
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        isPreemptionToBalanceEnabled);
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // I_A: A:33 B:33 C:33, preempt 7 from B and 17 from C to A
+    verify(mDisp, times(7)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(17)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+
+    assertEquals(33, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(33, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(33, policy.getQueuePartitions().get("c")
+        .get("").getIdealAssigned().getMemorySize());
+  }
+
+
+  @Test
+  public void testPreemptionToBalanceUsedPlusPendingLessThanGuaranteed()
+      throws IOException{
+    String labelsConfig = "=100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100]);" + //root
+            "-a(=[30 100 10 6]);" + // a
+            "-b(=[30 100 40 30]);" + // b
+            "-c(=[30 100 50 30]);" + // c
+            "-d(=[10 100 0 0])";   // d
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1,n1,,10,false);" + // app1 in a
+            "b\t(1,1,n1,,40,false);" + // app2 in b
+            "c\t(1,1,n1,,50,false)"; // app3 in c
+
+    boolean isPreemptionToBalanceEnabled = true;
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        isPreemptionToBalanceEnabled);
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // I_A: A:15 B:42 C:43, preempt 7 from B and 17 from C to A
+    verify(mDisp, times(8)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+
+    assertEquals(16, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(42, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(42, policy.getQueuePartitions().get("c")
+        .get("").getIdealAssigned().getMemorySize());
+  }
+
+  @Test
+  public void testPreemptionToBalanceWithVcoreResource() throws IOException {
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+    String labelsConfig = "=100:100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100:100 100:100 100:100 120:140]);" + //root
+            "-a(=[60:60 100:100 40:40 70:40]);" + // a
+            "-b(=[40:40 100:100 60:60 50:100])";   // b
+
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1:1,n1,,40,false);" + // app1 in a
+            "b\t(1,1:1,n1,,60,false)"; // app2 in b
+
+    boolean isPreemptionToBalanceEnabled = true;
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        isPreemptionToBalanceEnabled);
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+    policy.editSchedule();
+
+    // 21 containers will be preempted here
+    verify(mDisp, times(21)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.
+            IsPreemptionRequestFor(getAppAttemptId(2))));
+
+    assertEquals(60, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(60, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getVirtualCores());
+    assertEquals(40, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(40, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getVirtualCores());
+  }
+
+  @Test
+  public void testPreemptionToBalanceWithConfiguredTimeout() throws IOException {
+    Logger.getRootLogger().setLevel(Level.DEBUG);
+    String labelsConfig = "=100:100,true"; // default partition
+    String nodesConfig = "n1="; // only one node
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100:100 100:100 100:100 120:140]);" + //root
+            "-a(=[60:60 100:100 40:40 70:40]);" + // a
+            "-b(=[40:40 100:100 60:60 50:100])";   // b
+
+    String appsConfig =
+        //queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t(1,1:1,n1,,40,false);" + // app1 in a
+            "b\t(1,1:1,n1,,60,false)"; // app2 in b
+
+    boolean isPreemptionToBalanceEnabled = true;
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        isPreemptionToBalanceEnabled);
+    final long FB_MAX_BEFORE_KILL = 60 *1000;
+    conf.setLong(
+        CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+        FB_MAX_BEFORE_KILL);
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig, true);
+    policy.editSchedule();
+
+    Map<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+        Set<RMContainer>>> pcps= policy.getToPreemptCandidatesPerSelector();
+
+    String FIFO_CANDIDATE_SELECTOR = "FifoCandidatesSelector";
+    boolean hasFifoSelector = false;
+    for (Map.Entry<PreemptionCandidatesSelector, Map<ApplicationAttemptId,
+        Set<RMContainer>>> pc : pcps.entrySet()) {
+      if (pc.getKey().getClass().getSimpleName().equals(FIFO_CANDIDATE_SELECTOR)) {
+        FifoCandidatesSelector pcs = (FifoCandidatesSelector) pc.getKey();
+        if (pcs.getAllowQueuesBalanceAfterAllQueuesSatisfied() == true) {
+          hasFifoSelector = true;
+          assertEquals(pcs.getMaximumKillWaitTimeMs(), FB_MAX_BEFORE_KILL);
+        }
+      }
+    }
+
+    assertEquals(hasFifoSelector, true);
+
+    // 21 containers will be preempted here
+    verify(mDisp, times(21)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.
+            IsPreemptionRequestFor(getAppAttemptId(2))));
+
+    assertEquals(60, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(60, policy.getQueuePartitions().get("a")
+        .get("").getIdealAssigned().getVirtualCores());
+    assertEquals(40, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getMemorySize());
+    assertEquals(40, policy.getQueuePartitions().get("b")
+        .get("").getIdealAssigned().getVirtualCores());
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/29119430/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
index 2aff82d..800789a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java
@@ -1111,5 +1111,116 @@ public class TestCapacitySchedulerSurgicalPreemption
     rm1.close();
   }
 
+  @Test(timeout = 600000)
+  public void testPreemptionToBalanceWithCustomTimeout() throws Exception {
+    /**
+     * Test case: Submit two application (app1/app2) to different queues, queue
+     * structure:
+     *
+     * <pre>
+     *             Root
+     *            /  |  \
+     *           a   b   c
+     *          10   20  70
+     * </pre>
+     *
+     * 1) Two nodes (n1/n2) in the cluster, each of them has 20G.
+     *
+     * 2) app1 submit to queue-b, asks for 1G * 5
+     *
+     * 3) app2 submit to queue-c, ask for one 4G container (for AM)
+     *
+     * After preemption, we should expect:
+     * 1. Preempt 4 containers from app1
+     * 2. the selected containers will be killed after configured timeout.
+     * 3. AM of app2 successfully allocated.
+     */
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.PREEMPTION_TO_BALANCE_QUEUES_BEYOND_GUARANTEED,
+        true);
+    conf.setLong(
+        CapacitySchedulerConfiguration.MAX_WAIT_BEFORE_KILL_FOR_QUEUE_BALANCE_PREEMPTION,
+        20*1000);
+    CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
+        this.conf);
+
+    MockRM rm1 = new MockRM(conf);
+    rm1.getRMContext().setNodeLabelManager(mgr);
+    rm1.start();
+
+    MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB);
+    MockNM nm2 = rm1.registerNode("h2:1234", 20 * GB);
+    CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
+    RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
+    RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId());
+
+    // launch an app to queue, AM container should be launched in nm1
+    RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a");
+    MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
+
+    am1.allocate("*", 1 * GB, 38, new ArrayList<ContainerId>());
+
+    // Do allocation for node1/node2
+    for (int i = 0; i < 38; i++) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    }
+
+    // App1 should have 39 containers now
+    FiCaSchedulerApp schedulerApp1 = cs.getApplicationAttempt(
+        am1.getApplicationAttemptId());
+    Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+    // 20 from n1 and 19 from n2
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode1.getNodeID()),
+        am1.getApplicationAttemptId(), 20);
+    waitNumberOfLiveContainersOnNodeFromApp(cs.getNode(rmNode2.getNodeID()),
+        am1.getApplicationAttemptId(), 19);
+
+
+    // Submit app2 to queue-c and asks for a 4G container for AM
+    RMApp app2 = rm1.submitApp(4 * GB, "app", "user", null, "c");
+    FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt(
+        ApplicationAttemptId.newInstance(app2.getApplicationId(), 1));
+
+    // Call editSchedule: containers are selected to be preemption candidate
+    SchedulingMonitorManager smm = ((CapacityScheduler) rm1.
+        getResourceScheduler()).getSchedulingMonitorManager();
+    SchedulingMonitor smon = smm.getAvailableSchedulingMonitor();
+    ProportionalCapacityPreemptionPolicy editPolicy =
+        (ProportionalCapacityPreemptionPolicy) smon.getSchedulingEditPolicy();
+    editPolicy.editSchedule();
+    Assert.assertEquals(4, editPolicy.getToPreemptContainers().size());
+
+    // check live containers immediately, nothing happen
+    Assert.assertEquals(39, schedulerApp1.getLiveContainers().size());
+
+    Thread.sleep(20*1000);
+    // Call editSchedule again: selected containers are killed
+    editPolicy.editSchedule();
+    waitNumberOfLiveContainersFromApp(schedulerApp1, 35);
+
+    // Call allocation, containers are reserved
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 1);
+
+    // Call editSchedule twice and allocation once, container should get allocated
+    editPolicy.editSchedule();
+    editPolicy.editSchedule();
+
+    int tick = 0;
+    while (schedulerApp2.getLiveContainers().size() != 1 && tick < 10) {
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
+      cs.handle(new NodeUpdateSchedulerEvent(rmNode2));
+      tick++;
+      Thread.sleep(100);
+    }
+    waitNumberOfReservedContainersFromApp(schedulerApp2, 0);
+
+    rm1.close();
+
+
+  }
+
 
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org