You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2016/12/13 17:58:55 UTC

[1/2] hadoop git commit: YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2.8 01b50b36b -> 6a18ae849


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index bf19f5a..4ff0bf3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -52,6 +52,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Iterator;
+import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
@@ -91,6 +92,9 @@ public class ProportionalCapacityPreemptionPolicy
   private boolean observeOnly;
   private boolean lazyPreempionEnabled;
 
+  private float maxAllowableLimitForIntraQueuePreemption;
+  private float minimumThresholdForIntraQueuePreemption;
+
   // Pointer to other RM components
   private RMContext rmContext;
   private ResourceCalculator rc;
@@ -102,6 +106,8 @@ public class ProportionalCapacityPreemptionPolicy
     new HashMap<>();
   private Map<String, Map<String, TempQueuePerPartition>> queueToPartitions =
       new HashMap<>();
+  private Map<String, LinkedHashSet<String>> partitionToUnderServedQueues =
+      new HashMap<String, LinkedHashSet<String>>();
   private List<PreemptionCandidatesSelector>
       candidatesSelectionPolicies = new ArrayList<>();
   private Set<String> allPartitions;
@@ -171,23 +177,44 @@ public class ProportionalCapacityPreemptionPolicy
         CapacitySchedulerConfiguration.LAZY_PREEMPTION_ENALBED,
         CapacitySchedulerConfiguration.DEFAULT_LAZY_PREEMPTION_ENABLED);
 
+    maxAllowableLimitForIntraQueuePreemption = csConfig.getFloat(
+        CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        CapacitySchedulerConfiguration.
+        DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT);
+
+    minimumThresholdForIntraQueuePreemption = csConfig.getFloat(
+        CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD,
+        CapacitySchedulerConfiguration.
+        DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
+
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
 
     // Do we need to specially consider reserved containers?
     boolean selectCandidatesForResevedContainers = csConfig.getBoolean(
-        CapacitySchedulerConfiguration.PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
-        CapacitySchedulerConfiguration.DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
+        CapacitySchedulerConfiguration.
+        PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS,
+        CapacitySchedulerConfiguration.
+        DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS);
     if (selectCandidatesForResevedContainers) {
-      candidatesSelectionPolicies.add(
-          new ReservedContainerCandidatesSelector(this));
+      candidatesSelectionPolicies
+          .add(new ReservedContainerCandidatesSelector(this));
     }
 
     // initialize candidates preemption selection policies
-    candidatesSelectionPolicies.add(
-        new FifoCandidatesSelector(this));
+    candidatesSelectionPolicies.add(new FifoCandidatesSelector(this));
+
+    // Do we need to specially consider intra queue
+    boolean isIntraQueuePreemptionEnabled = csConfig.getBoolean(
+        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
+        CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
+    if (isIntraQueuePreemptionEnabled) {
+      candidatesSelectionPolicies.add(new IntraQueueCandidatesSelector(this));
+    }
   }
-  
+
   @Override
   public ResourceCalculator getResourceCalculator() {
     return rc;
@@ -209,6 +236,12 @@ public class ProportionalCapacityPreemptionPolicy
   @SuppressWarnings("unchecked")
   private void preemptOrkillSelectedContainerAfterWait(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "Starting to preempt containers for selectedCandidates and size:"
+              + selectedCandidates.size());
+    }
+
     // preempt (or kill) the selected containers
     for (Map.Entry<ApplicationAttemptId, Set<RMContainer>> e : selectedCandidates
         .entrySet()) {
@@ -233,6 +266,7 @@ public class ProportionalCapacityPreemptionPolicy
             // not have to raise another event.
             continue;
           }
+
           //otherwise just send preemption events
           rmContext.getDispatcher().getEventHandler().handle(
               new ContainerPreemptEvent(appAttemptId, container,
@@ -291,7 +325,6 @@ public class ProportionalCapacityPreemptionPolicy
    * @param root the root of the CapacityScheduler queue hierarchy
    * @param clusterResources the total amount of resources in the cluster
    */
-  @SuppressWarnings("unchecked")
   private void containerBasedPreemptOrKill(CSQueue root,
       Resource clusterResources) {
     // Sync killable containers from scheduler when lazy preemption enabled
@@ -537,4 +570,41 @@ public class ProportionalCapacityPreemptionPolicy
   Map<String, Map<String, TempQueuePerPartition>> getQueuePartitions() {
     return queueToPartitions;
   }
+
+  @Override
+  public int getClusterMaxApplicationPriority() {
+    return scheduler.getMaxClusterLevelAppPriority().getPriority();
+  }
+
+  @Override
+  public float getMaxAllowableLimitForIntraQueuePreemption() {
+    return maxAllowableLimitForIntraQueuePreemption;
+  }
+
+  @Override
+  public float getMinimumThresholdForIntraQueuePreemption() {
+    return minimumThresholdForIntraQueuePreemption;
+  }
+
+  @Override
+  public Resource getPartitionResource(String partition) {
+    return Resources.clone(nlm.getResourceByLabel(partition,
+        Resources.clone(scheduler.getClusterResource())));
+  }
+
+  public LinkedHashSet<String> getUnderServedQueuesPerPartition(
+      String partition) {
+    return partitionToUnderServedQueues.get(partition);
+  }
+
+  public void addPartitionToUnderServedQueues(String queueName,
+      String partition) {
+    LinkedHashSet<String> underServedQueues = partitionToUnderServedQueues
+        .get(partition);
+    if (null == underServedQueues) {
+      underServedQueues = new LinkedHashSet<String>();
+      partitionToUnderServedQueues.put(partition, underServedQueues);
+    }
+    underServedQueues.add(queueName);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
new file mode 100644
index 0000000..fccd2a7
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Temporary data-structure tracking resource availability, pending resource
+ * need, current utilization for an application.
+ */
+public class TempAppPerPartition extends AbstractPreemptionEntity {
+
+  // Following fields are settled and used by candidate selection policies
+  private final int priority;
+  private final ApplicationId applicationId;
+
+  FiCaSchedulerApp app;
+
+  TempAppPerPartition(FiCaSchedulerApp app, Resource usedPerPartition,
+      Resource amUsedPerPartition, Resource reserved,
+      Resource pendingPerPartition) {
+    super(app.getQueueName(), usedPerPartition, amUsedPerPartition, reserved,
+        pendingPerPartition);
+
+    this.priority = app.getPriority().getPriority();
+    this.applicationId = app.getApplicationId();
+    this.app = app;
+  }
+
+  public FiCaSchedulerApp getFiCaSchedulerApp() {
+    return app;
+  }
+
+  public void assignPreemption(Resource killable) {
+    Resources.addTo(toBePreempted, killable);
+  }
+
+  @Override
+  public String toString() {
+    StringBuilder sb = new StringBuilder();
+    sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+        .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+        .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" PREEMPT_OTHER: ")
+        .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+        .append(getActuallyToBePreempted()).append("\n");
+
+    return sb.toString();
+  }
+
+  void appendLogString(StringBuilder sb) {
+    sb.append(queueName).append(", ").append(getUsed().getMemorySize())
+        .append(", ").append(getUsed().getVirtualCores()).append(", ")
+        .append(pending.getMemorySize()).append(", ")
+        .append(pending.getVirtualCores()).append(", ")
+        .append(idealAssigned.getMemorySize()).append(", ")
+        .append(idealAssigned.getVirtualCores()).append(", ")
+        .append(toBePreempted.getMemorySize()).append(", ")
+        .append(toBePreempted.getVirtualCores()).append(", ")
+        .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+        .append(getActuallyToBePreempted().getVirtualCores());
+  }
+
+  public int getPriority() {
+    return priority;
+  }
+
+  public ApplicationId getApplicationId() {
+    return applicationId;
+  }
+
+  public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
+      Resource cluster, Resource toBeDeduct, String partition) {
+    if (Resources.greaterThan(resourceCalculator, cluster,
+        getActuallyToBePreempted(), toBeDeduct)) {
+      Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 04ed135..28099c4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -25,34 +25,29 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
+import java.util.Collection;
 
 /**
  * Temporary data-structure tracking resource availability, pending resource
  * need, current utilization. This is per-queue-per-partition data structure
  */
-public class TempQueuePerPartition {
+public class TempQueuePerPartition extends AbstractPreemptionEntity {
   // Following fields are copied from scheduler
-  final String queueName;
   final String partition;
-  final Resource pending;
 
-  private final Resource current;
   private final Resource killable;
-  private final Resource reserved;
   private final float absCapacity;
   private final float absMaxCapacity;
   final Resource totalPartitionResource;
 
-  // Following fields are setted and used by candidate selection policies
-  Resource idealAssigned;
-  Resource toBePreempted;
+  // Following fields are settled and used by candidate selection policies
   Resource untouchableExtra;
   Resource preemptableExtra;
-  private Resource actuallyToBePreempted;
 
   double normalizedGuarantee;
 
   final ArrayList<TempQueuePerPartition> children;
+  private Collection<TempAppPerPartition> apps;
   LeafQueue leafQueue;
   boolean preemptionDisabled;
 
@@ -60,8 +55,8 @@ public class TempQueuePerPartition {
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
       Resource reserved, CSQueue queue) {
-    this.queueName = queueName;
-    this.current = current;
+    super(queueName, current, Resource.newInstance(0, 0), reserved,
+        Resource.newInstance(0, 0));
 
     if (queue instanceof LeafQueue) {
       LeafQueue l = (LeafQueue) queue;
@@ -72,11 +67,9 @@ public class TempQueuePerPartition {
       pending = Resources.createResource(0);
     }
 
-    this.idealAssigned = Resource.newInstance(0, 0);
-    this.actuallyToBePreempted = Resource.newInstance(0, 0);
-    this.toBePreempted = Resource.newInstance(0, 0);
     this.normalizedGuarantee = Float.NaN;
     this.children = new ArrayList<>();
+    this.apps = new ArrayList<>();
     this.untouchableExtra = Resource.newInstance(0, 0);
     this.preemptableExtra = Resource.newInstance(0, 0);
     this.preemptionDisabled = preemptionDisabled;
@@ -85,7 +78,6 @@ public class TempQueuePerPartition {
     this.absCapacity = absCapacity;
     this.absMaxCapacity = absMaxCapacity;
     this.totalPartitionResource = totalPartitionResource;
-    this.reserved = reserved;
   }
 
   public void setLeafQueue(LeafQueue l) {
@@ -95,7 +87,9 @@ public class TempQueuePerPartition {
 
   /**
    * When adding a child we also aggregate its pending resource needs.
-   * @param q the child queue to add to this queue
+   *
+   * @param q
+   *          the child queue to add to this queue
    */
   public void addChild(TempQueuePerPartition q) {
     assert leafQueue == null;
@@ -103,14 +97,10 @@ public class TempQueuePerPartition {
     Resources.addTo(pending, q.pending);
   }
 
-  public ArrayList<TempQueuePerPartition> getChildren(){
+  public ArrayList<TempQueuePerPartition> getChildren() {
     return children;
   }
 
-  public Resource getUsed() {
-    return current;
-  }
-
   public Resource getUsedDeductReservd() {
     return Resources.subtract(current, reserved);
   }
@@ -122,28 +112,30 @@ public class TempQueuePerPartition {
     Resource absMaxCapIdealAssignedDelta = Resources.componentwiseMax(
         Resources.subtract(getMax(), idealAssigned),
         Resource.newInstance(0, 0));
-    // remain = avail - min(avail, (max - assigned), (current + pending - assigned))
+    // remain = avail - min(avail, (max - assigned), (current + pending -
+    // assigned))
     Resource accepted = Resources.min(rc, clusterResource,
-        absMaxCapIdealAssignedDelta, Resources.min(rc, clusterResource, avail,
-            Resources
-                /*
-                 * When we're using FifoPreemptionSelector
-                 * (considerReservedResource = false).
-                 *
-                 * We should deduct reserved resource to avoid excessive preemption:
-                 *
-                 * For example, if an under-utilized queue has used = reserved = 20.
-                 * Preemption policy will try to preempt 20 containers
-                 * (which is not satisfied) from different hosts.
-                 *
-                 * In FifoPreemptionSelector, there's no guarantee that preempted
-                 * resource can be used by pending request, so policy will preempt
-                 * resources repeatly.
-                 */
-                .subtract(Resources.add(
-                    (considersReservedResource ? getUsed() :
-                      getUsedDeductReservd()),
-                    pending), idealAssigned)));
+        absMaxCapIdealAssignedDelta,
+        Resources.min(rc, clusterResource, avail, Resources
+            /*
+             * When we're using FifoPreemptionSelector (considerReservedResource
+             * = false).
+             *
+             * We should deduct reserved resource to avoid excessive preemption:
+             *
+             * For example, if an under-utilized queue has used = reserved = 20.
+             * Preemption policy will try to preempt 20 containers (which is not
+             * satisfied) from different hosts.
+             *
+             * In FifoPreemptionSelector, there's no guarantee that preempted
+             * resource can be used by pending request, so policy will preempt
+             * resources repeatly.
+             */
+            .subtract(
+                Resources.add((considersReservedResource
+                    ? getUsed()
+                    : getUsedDeductReservd()), pending),
+                idealAssigned)));
     Resource remain = Resources.subtract(avail, accepted);
     Resources.addTo(idealAssigned, accepted);
     return remain;
@@ -162,8 +154,7 @@ public class TempQueuePerPartition {
     untouchableExtra = Resources.none();
     preemptableExtra = Resources.none();
 
-    Resource extra = Resources.subtract(getUsed(),
-        getGuaranteed());
+    Resource extra = Resources.subtract(getUsed(), getGuaranteed());
     if (Resources.lessThan(rc, totalPartitionResource, extra,
         Resources.none())) {
       extra = Resources.none();
@@ -197,26 +188,21 @@ public class TempQueuePerPartition {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append(" NAME: " + queueName)
-        .append(" CUR: ").append(current)
-        .append(" PEN: ").append(pending)
-        .append(" RESERVED: ").append(reserved)
-        .append(" GAR: ").append(getGuaranteed())
-        .append(" NORM: ").append(normalizedGuarantee)
-        .append(" IDEAL_ASSIGNED: ").append(idealAssigned)
-        .append(" IDEAL_PREEMPT: ").append(toBePreempted)
-        .append(" ACTUAL_PREEMPT: ").append(actuallyToBePreempted)
+    sb.append(" NAME: " + queueName).append(" CUR: ").append(current)
+        .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
+        .append(" GAR: ").append(getGuaranteed()).append(" NORM: ")
+        .append(normalizedGuarantee).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" IDEAL_PREEMPT: ").append(toBePreempted)
+        .append(" ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
         .append(" UNTOUCHABLE: ").append(untouchableExtra)
-        .append(" PREEMPTABLE: ").append(preemptableExtra)
-        .append("\n");
+        .append(" PREEMPTABLE: ").append(preemptableExtra).append("\n");
 
     return sb.toString();
   }
 
   public void assignPreemption(float scalingFactor, ResourceCalculator rc,
       Resource clusterResource) {
-    Resource usedDeductKillable = Resources.subtract(
-        getUsed(), killable);
+    Resource usedDeductKillable = Resources.subtract(getUsed(), killable);
     Resource totalResource = Resources.add(getUsed(), pending);
 
     // The minimum resource that we need to keep for a queue is:
@@ -224,7 +210,8 @@ public class TempQueuePerPartition {
     //
     // Doing this because when we calculate ideal allocation doesn't consider
     // reserved resource, ideal-allocation calculated could be less than
-    // guaranteed and total. We should avoid preempt from a queue if it is already
+    // guaranteed and total. We should avoid preempt from a queue if it is
+    // already
     // <= its guaranteed resource.
     Resource minimumQueueResource = Resources.max(rc, clusterResource,
         Resources.min(rc, clusterResource, totalResource, getGuaranteed()),
@@ -233,33 +220,26 @@ public class TempQueuePerPartition {
     if (Resources.greaterThan(rc, clusterResource, usedDeductKillable,
         minimumQueueResource)) {
       toBePreempted = Resources.multiply(
-          Resources.subtract(usedDeductKillable, minimumQueueResource), scalingFactor);
+          Resources.subtract(usedDeductKillable, minimumQueueResource),
+          scalingFactor);
     } else {
       toBePreempted = Resources.none();
     }
   }
 
-  public Resource getActuallyToBePreempted() {
-    return actuallyToBePreempted;
-  }
-
-  public void setActuallyToBePreempted(Resource res) {
-    this.actuallyToBePreempted = res;
-  }
-
   public void deductActuallyToBePreempted(ResourceCalculator rc,
       Resource cluster, Resource toBeDeduct) {
-    if (Resources.greaterThan(rc, cluster, actuallyToBePreempted, toBeDeduct)) {
-      Resources.subtractFrom(actuallyToBePreempted, toBeDeduct);
+    if (Resources.greaterThan(rc, cluster, getActuallyToBePreempted(),
+        toBeDeduct)) {
+      Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);
     }
-    actuallyToBePreempted = Resources.max(rc, cluster, actuallyToBePreempted,
-        Resources.none());
+    setActuallyToBePreempted(Resources.max(rc, cluster,
+        getActuallyToBePreempted(), Resources.none()));
   }
 
   void appendLogString(StringBuilder sb) {
-    sb.append(queueName).append(", ")
-        .append(current.getMemorySize()).append(", ")
-        .append(current.getVirtualCores()).append(", ")
+    sb.append(queueName).append(", ").append(current.getMemorySize())
+        .append(", ").append(current.getVirtualCores()).append(", ")
         .append(pending.getMemorySize()).append(", ")
         .append(pending.getVirtualCores()).append(", ")
         .append(getGuaranteed().getMemorySize()).append(", ")
@@ -267,9 +247,17 @@ public class TempQueuePerPartition {
         .append(idealAssigned.getMemorySize()).append(", ")
         .append(idealAssigned.getVirtualCores()).append(", ")
         .append(toBePreempted.getMemorySize()).append(", ")
-        .append(toBePreempted.getVirtualCores() ).append(", ")
-        .append(actuallyToBePreempted.getMemorySize()).append(", ")
-        .append(actuallyToBePreempted.getVirtualCores());
+        .append(toBePreempted.getVirtualCores()).append(", ")
+        .append(getActuallyToBePreempted().getMemorySize()).append(", ")
+        .append(getActuallyToBePreempted().getVirtualCores());
+  }
+
+  public void addAllApps(Collection<TempAppPerPartition> orderedApps) {
+    this.apps = orderedApps;
+  }
+
+  public Collection<TempAppPerPartition> getApps() {
+    return apps;
   }
 
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 6db5074..cea5aa4 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1045,6 +1045,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   private static final String PREEMPTION_CONFIG_PREFIX =
       "yarn.resourcemanager.monitor.capacity.preemption.";
 
+  private static final String INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX =
+      "intra-queue-preemption.";
+
   /** If true, run the policy but do not affect the cluster with preemption and
    * kill events. */
   public static final String PREEMPTION_OBSERVE_ONLY =
@@ -1098,4 +1101,32 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
       PREEMPTION_CONFIG_PREFIX + "select_based_on_reserved_containers";
   public static final boolean DEFAULT_PREEMPTION_SELECT_CANDIDATES_FOR_RESERVED_CONTAINERS =
       false;
+
+  /**
+   * For intra-queue preemption, priority/user-limit/fairness based selectors
+   * can help to preempt containers.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_ENABLED =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "enabled";
+  public static final boolean DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED = false;
+
+  /**
+   * For intra-queue preemption, consider those queues which are above used cap
+   * limit.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "minimum-threshold";
+  public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD =
+      0.5f;
+
+  /**
+   * For intra-queue preemption, allowable maximum-preemptable limit per queue.
+   */
+  public static final String INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+      PREEMPTION_CONFIG_PREFIX +
+      INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "max-allowable-limit";
+  public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
+      0.2f;
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 20378bb..eef43c3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -73,6 +73,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -1029,8 +1030,9 @@ public class LeafQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerApp application,
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-        computeUserLimit(application, clusterResource, user, partition,
-            SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
+        computeUserLimit(application.getUser(), clusterResource, user,
+            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+        partition);
   }
 
   private Resource getHeadroom(User user,
@@ -1101,7 +1103,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
     Resource userLimit =
-        computeUserLimit(application, clusterResource, queueUser,
+        computeUserLimit(application.getUser(), clusterResource, queueUser,
             nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
@@ -1139,7 +1141,7 @@ public class LeafQueue extends AbstractCSQueue {
   }
 
   @Lock(NoLock.class)
-  private Resource computeUserLimit(FiCaSchedulerApp application,
+  private Resource computeUserLimit(String userName,
       Resource clusterResource, User user,
       String nodePartition, SchedulingMode schedulingMode) {
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
@@ -1239,7 +1241,6 @@ public class LeafQueue extends AbstractCSQueue {
             minimumAllocation);
 
     if (LOG.isDebugEnabled()) {
-      String userName = application.getUser();
       LOG.debug("User limit computation for " + userName +
           " in queue " + getQueueName() +
           " userLimitPercent=" + userLimit +
@@ -1815,11 +1816,22 @@ public class LeafQueue extends AbstractCSQueue {
   /**
    * Obtain (read-only) collection of active applications.
    */
-  public Collection<FiCaSchedulerApp> getApplications() {
+  public synchronized Collection<FiCaSchedulerApp> getApplications() {
     return Collections.unmodifiableCollection(orderingPolicy
         .getSchedulableEntities());
   }
 
+  /**
+   * Obtain (read-only) collection of all applications.
+   */
+  public synchronized Collection<FiCaSchedulerApp> getAllApplications() {
+    Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
+        pendingOrderingPolicy.getSchedulableEntities());
+    apps.addAll(orderingPolicy.getSchedulableEntities());
+
+    return Collections.unmodifiableCollection(apps);
+  }
+
   // Consider the headroom for each user in the queue.
   // Total pending for the queue =
   //   sum(for each user(min((user's headroom), sum(user's pending requests))))
@@ -1833,7 +1845,7 @@ public class LeafQueue extends AbstractCSQueue {
       if (!userNameToHeadroom.containsKey(userName)) {
         User user = getUser(userName);
         Resource headroom = Resources.subtract(
-            computeUserLimit(app, resources, user, partition,
+            computeUserLimit(app.getUser(), resources, user, partition,
                 SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
                 user.getUsed(partition));
         // Make sure headroom is not negative.
@@ -1851,6 +1863,16 @@ public class LeafQueue extends AbstractCSQueue {
     return pendingConsideringUserLimit;
   }
 
+  public synchronized Resource getUserLimitPerUser(String userName,
+      Resource resources, String partition) {
+
+    // Check user resource limit
+    User user = getUser(userName);
+
+    return computeUserLimit(userName, resources, user, partition,
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+  }
+
   @Override
   public synchronized void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {
@@ -1901,8 +1923,8 @@ public class LeafQueue extends AbstractCSQueue {
   }
   
   /**
-   * return all ignored partition exclusivity RMContainers in the LeafQueue, this
-   * will be used by preemption policy, and use of return
+   * @return all ignored partition exclusivity RMContainers in the LeafQueue,
+   *         this will be used by preemption policy, and use of return
    * ignorePartitionExclusivityRMContainer should protected by LeafQueue
    * synchronized lock
    */

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 38995ff..b7b47b0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica;
 
 import java.util.Collections;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -312,6 +313,23 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
     return ret;
   }
 
+  public synchronized Map<String, Resource> getTotalPendingRequestsPerPartition() {
+
+    Map<String, Resource> ret = new HashMap<String, Resource>();
+    Resource res = null;
+    for (Priority  priority : appSchedulingInfo.getPriorities()) {
+      ResourceRequest rr = appSchedulingInfo.getResourceRequest(priority, "*");
+      if ((res = ret.get(rr.getNodeLabelExpression())) == null) {
+        res = Resources.createResource(0, 0);
+        ret.put(rr.getNodeLabelExpression(), res);
+      }
+
+      Resources.addTo(res,
+          Resources.multiply(rr.getCapability(), rr.getNumContainers()));
+    }
+    return ret;
+  }
+
   public synchronized void markContainerForPreemption(ContainerId cont) {
     // ignore already completed containers
     if (liveContainers.containsKey(cont)) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index e60c384..3c1d9f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -63,11 +63,14 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 
 import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyString;
 import static org.mockito.Matchers.eq;
 import static org.mockito.Matchers.isA;
 import static org.mockito.Mockito.doAnswer;
@@ -160,13 +163,17 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         mClock);
   }
 
-  private void mockContainers(String containersConfig, ApplicationAttemptId attemptId,
-      String queueName, List<RMContainer> reservedContainers,
-      List<RMContainer> liveContainers) {
+  private void mockContainers(String containersConfig, FiCaSchedulerApp app,
+      ApplicationAttemptId attemptId, String queueName,
+      List<RMContainer> reservedContainers, List<RMContainer> liveContainers) {
     int containerId = 1;
     int start = containersConfig.indexOf("=") + 1;
     int end = -1;
 
+    Resource used = Resource.newInstance(0, 0);
+    Resource pending = Resource.newInstance(0, 0);
+    Priority pri = Priority.newInstance(0);
+
     while (start < containersConfig.length()) {
       while (start < containersConfig.length()
           && containersConfig.charAt(start) != '(') {
@@ -188,41 +195,50 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
 
       // now we found start/end, get container values
       String[] values = containersConfig.substring(start + 1, end).split(",");
-      if (values.length != 6) {
+      if (values.length < 6 || values.length > 8) {
         throw new IllegalArgumentException("Format to define container is:"
-            + "(priority,resource,host,expression,repeat,reserved)");
+            + "(priority,resource,host,expression,repeat,reserved, pending)");
       }
-      Priority pri = Priority.newInstance(Integer.valueOf(values[0]));
+      pri.setPriority(Integer.valueOf(values[0]));
       Resource res = parseResourceFromString(values[1]);
       NodeId host = NodeId.newInstance(values[2], 1);
-      String exp = values[3];
+      String label = values[3];
+      String userName = "user";
       int repeat = Integer.valueOf(values[4]);
       boolean reserved = Boolean.valueOf(values[5]);
+      if (values.length >= 7) {
+        Resources.addTo(pending, parseResourceFromString(values[6]));
+      }
+      if (values.length == 8) {
+        userName = values[7];
+      }
 
       for (int i = 0; i < repeat; i++) {
         Container c = mock(Container.class);
+        Resources.addTo(used, res);
         when(c.getResource()).thenReturn(res);
         when(c.getPriority()).thenReturn(pri);
         RMContainerImpl rmc = mock(RMContainerImpl.class);
         when(rmc.getAllocatedNode()).thenReturn(host);
-        when(rmc.getNodeLabelExpression()).thenReturn(exp);
+        when(rmc.getNodeLabelExpression()).thenReturn(label);
         when(rmc.getAllocatedResource()).thenReturn(res);
         when(rmc.getContainer()).thenReturn(c);
         when(rmc.getApplicationAttemptId()).thenReturn(attemptId);
         when(rmc.getQueueName()).thenReturn(queueName);
-        final ContainerId cId = ContainerId.newContainerId(attemptId, containerId);
-        when(rmc.getContainerId()).thenReturn(
-            cId);
+        final ContainerId cId = ContainerId.newContainerId(attemptId,
+            containerId);
+        when(rmc.getContainerId()).thenReturn(cId);
         doAnswer(new Answer<Integer>() {
           @Override
           public Integer answer(InvocationOnMock invocation) throws Throwable {
-            return cId.compareTo(((RMContainer) invocation.getArguments()[0])
-                .getContainerId());
+            return cId.compareTo(
+                ((RMContainer) invocation.getArguments()[0]).getContainerId());
           }
         }).when(rmc).compareTo(any(RMContainer.class));
 
         if (containerId == 1) {
           when(rmc.isAMContainer()).thenReturn(true);
+          when(app.getAMResource(label)).thenReturn(res);
         }
 
         if (reserved) {
@@ -237,25 +253,44 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
 
         // If this is a non-exclusive allocation
         String partition = null;
-        if (exp.isEmpty()
+        if (label.isEmpty()
             && !(partition = nodeIdToSchedulerNodes.get(host).getPartition())
-            .isEmpty()) {
+                .isEmpty()) {
           LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
-          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers =
-              queue.getIgnoreExclusivityRMContainers();
+          Map<String, TreeSet<RMContainer>> ignoreExclusivityContainers = queue
+              .getIgnoreExclusivityRMContainers();
           if (!ignoreExclusivityContainers.containsKey(partition)) {
             ignoreExclusivityContainers.put(partition,
                 new TreeSet<RMContainer>());
           }
           ignoreExclusivityContainers.get(partition).add(rmc);
         }
-        LOG.debug("add container to app=" + attemptId + " res=" + res
-            + " node=" + host + " nodeLabelExpression=" + exp + " partition="
+        LOG.debug("add container to app=" + attemptId + " res=" + res + " node="
+            + host + " nodeLabelExpression=" + label + " partition="
             + partition);
 
         containerId++;
       }
 
+      // Some more app specific aggregated data can be better filled here.
+      when(app.getPriority()).thenReturn(pri);
+      when(app.getUser()).thenReturn(userName);
+      when(app.getCurrentConsumption()).thenReturn(used);
+      when(app.getCurrentReservation())
+          .thenReturn(Resources.createResource(0, 0));
+
+      Map<String, Resource> pendingForDefaultPartition =
+          new HashMap<String, Resource>();
+      // Add for default partition for now.
+      pendingForDefaultPartition.put(label, pending);
+      when(app.getTotalPendingRequestsPerPartition())
+          .thenReturn(pendingForDefaultPartition);
+
+      // need to set pending resource in resource usage as well
+      ResourceUsage ru = new ResourceUsage();
+      ru.setUsed(label, used);
+      when(app.getAppAttemptResourceUsage()).thenReturn(ru);
+
       start = end + 1;
     }
   }
@@ -271,6 +306,8 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
    */
   private void mockApplications(String appsConfig) {
     int id = 1;
+    HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
+    LeafQueue queue = null;
     for (String a : appsConfig.split(";")) {
       String[] strs = a.split("\t");
       String queueName = strs[0];
@@ -279,24 +316,49 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       List<RMContainer> liveContainers = new ArrayList<RMContainer>();
       List<RMContainer> reservedContainers = new ArrayList<RMContainer>();
       ApplicationId appId = ApplicationId.newInstance(0L, id);
-      ApplicationAttemptId appAttemptId = ApplicationAttemptId.newInstance(appId, 1);
+      ApplicationAttemptId appAttemptId = ApplicationAttemptId
+          .newInstance(appId, 1);
 
-      mockContainers(strs[1], appAttemptId, queueName, reservedContainers,
+      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
+      when(app.getAMResource(anyString()))
+          .thenReturn(Resources.createResource(0, 0));
+      mockContainers(strs[1], app, appAttemptId, queueName, reservedContainers,
           liveContainers);
+      LOG.debug("Application mock: queue: " + queueName + ", appId:" + appId);
 
-      FiCaSchedulerApp app = mock(FiCaSchedulerApp.class);
       when(app.getLiveContainers()).thenReturn(liveContainers);
       when(app.getReservedContainers()).thenReturn(reservedContainers);
       when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(app.getApplicationId()).thenReturn(appId);
-      when(app.getPriority()).thenReturn(Priority.newInstance(0));
 
       // add to LeafQueue
-      LeafQueue queue = (LeafQueue) nameToCSQueues.get(queueName);
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
       queue.getApplications().add(app);
+      queue.getAllApplications().add(app);
 
+      HashSet<String> users = userMap.get(queueName);
+      if (null == users) {
+        users = new HashSet<String>();
+        userMap.put(queueName, users);
+      }
+
+      users.add(app.getUser());
       id++;
     }
+
+    for (String queueName : userMap.keySet()) {
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
+      // Currently we have user-limit test support only for default label.
+      Resource totResoucePerPartition = partitionToResource.get("");
+      Resource capacity = Resources.multiply(totResoucePerPartition,
+          queue.getQueueCapacities().getAbsoluteCapacity());
+      HashSet<String> users = userMap.get(queue.getQueueName());
+      Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+      for (String user : users) {
+        when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+            anyString())).thenReturn(userLimit);
+      }
+    }
   }
 
   private void addContainerToSchedulerNode(NodeId nodeId, RMContainer container,
@@ -430,10 +492,18 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
             new Comparator<FiCaSchedulerApp>() {
               @Override
               public int compare(FiCaSchedulerApp a1, FiCaSchedulerApp a2) {
-                return a1.getApplicationId().compareTo(a2.getApplicationId());
+                if (a1.getPriority() != null
+                    && !a1.getPriority().equals(a2.getPriority())) {
+                  return a1.getPriority().compareTo(a2.getPriority());
+                }
+
+                int res = a1.getApplicationId()
+                    .compareTo(a2.getApplicationId());
+                return res;
               }
             });
         when(leafQueue.getApplications()).thenReturn(apps);
+        when(leafQueue.getAllApplications()).thenReturn(apps);
         OrderingPolicy<FiCaSchedulerApp> so = mock(OrderingPolicy.class);
         when(so.getPreemptionIterator()).thenAnswer(new Answer() {
           public Object answer(InvocationOnMock invocation) {
@@ -512,10 +582,15 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       float absUsed = Resources.divide(rc, totResoucePerPartition,
           parseResourceFromString(values[2].trim()), totResoucePerPartition)
           + epsilon;
+      float used = Resources.divide(rc, totResoucePerPartition,
+          parseResourceFromString(values[2].trim()),
+          parseResourceFromString(values[0].trim())) + epsilon;
       Resource pending = parseResourceFromString(values[3].trim());
       qc.setAbsoluteCapacity(partitionName, absGuaranteed);
       qc.setAbsoluteMaximumCapacity(partitionName, absMax);
       qc.setAbsoluteUsedCapacity(partitionName, absUsed);
+      qc.setUsedCapacity(partitionName, used);
+      when(queue.getUsedCapacity()).thenReturn(used);
       ru.setPending(partitionName, pending);
       if (!isParent(queueExprArray, idx)) {
         LeafQueue lq = (LeafQueue) queue;
@@ -530,6 +605,7 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         reserved = parseResourceFromString(values[4].trim());
         ru.setReserved(partitionName, reserved);
       }
+
       LOG.debug("Setup queue=" + queueName + " partition=" + partitionName
           + " [abs_guaranteed=" + absGuaranteed + ",abs_max=" + absMax
           + ",abs_used" + absUsed + ",pending_resource=" + pending

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
new file mode 100644
index 0000000..19fb0d2
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -0,0 +1,868 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+
+import static org.mockito.Matchers.argThat;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.times;
+import static org.mockito.Mockito.verify;
+
+/**
+ * Test class for IntraQueuePreemption scenarios.
+ */
+public class TestProportionalCapacityPreemptionPolicyIntraQueue
+    extends
+      ProportionalCapacityPreemptionPolicyMockFramework {
+  @Before
+  public void setup() {
+    super.setup();
+    conf.setBoolean(
+        CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED, true);
+    policy = new ProportionalCapacityPreemptionPolicy(rmContext, cs, mClock);
+  }
+
+  @Test
+  public void testSimpleIntraQueuePreemption() throws IOException {
+    /**
+     * The simplest test preemption, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Guaranteed resource of a/b/c/d are 11:40:20:29 Total cluster resource =
+     * 100
+     * Scenario:
+     * Queue B has few running apps and two high priority apps have demand.
+     * Apps which are running at low priority (4) will preempt few of its
+     * resources to meet the demand.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 80 120 0]);" + // root
+            "-a(=[11 100 11 50 0]);" + // a
+            "-b(=[40 100 38 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[29 100 20 0 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,6,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(1,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,34,false,20);" + // app3 b
+            "b\t" // app4 in b
+            + "(4,1,n1,,2,false,10);" + // app4 b
+            "b\t" // app4 in b
+            + "(5,1,n1,,1,false,10);" + // app5 b
+            "b\t" // app4 in b
+            + "(6,1,n1,,1,false,10);" + // app6 in b
+            "c\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+            + "(1,1,n1,,20,false,0)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B, app3 and app4 were of lower priority. Hence take 8
+    // containers from them by hitting the intraQueuePreemptionDemand of 20%.
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(7)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNoPreemptionForSamePriorityApps() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+     * 100
+     * Scenario: In queue A/B, all apps are running at same priority. However
+     * there are many demands also from these apps. Since all apps are at same
+     * priority, preemption should not occur here.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 80 120 0]);" + // root
+            "-a(=[10 100 10 50 0]);" + // a
+            "-b(=[40 100 40 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[30 100 20 0 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,6,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(1,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(1,1,n1,,34,false,20);" + // app3 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,2,false,10);" + // app4 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,1,false,20);" + // app5 b
+            "b\t" // app4 in b
+            + "(1,1,n1,,1,false,10);" + // app6 in b
+            "c\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "d\t" // app7 in c
+            + "(1,1,n1,,20,false,0)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B, none of the apps should be preempted.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(5))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(6))));
+  }
+
+  @Test
+  public void testNoPreemptionWhenQueueIsUnderCapacityLimit()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100 BY
+     * default, this limit is 50%. Test to verify that there wont be any
+     * preemption since used capacity is under 50% for queue a/b even though
+     * there are demands from high priority apps in queue.
+     */
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 35 80 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 25 30 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue A/B, none of the apps should be preempted as used capacity
+    // is under 50%.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+  }
+
+  @Test
+  public void testLimitPreemptionWithMaxIntraQueuePreemptableLimit()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * maxIntraQueuePreemptableLimit by default is 50%. This test is to verify
+     * that the maximum preemption should occur upto 50%, eventhough demand is
+     * more.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 45 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queueB, eventhough app4 needs 100 resources, only 30 resources were
+    // preempted. (max is 50% of guaranteed cap of any queue
+    // "maxIntraQueuePreemptable")
+    verify(mDisp, times(30)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testLimitPreemptionWithTotalPreemptedResourceAllowed()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * totalPreemption allowed is 10%. This test is to verify that only
+     * 10% is preempted.
+     */
+
+    // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 0.1);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[40 100 10 50 0]);" + // a
+            "-b(=[60 100 45 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,40,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(6,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // For queue B eventhough app4 needs 100 resources, only 10 resources were
+    // preempted. This is the 10% limit of TOTAL_PREEMPTION_PER_ROUND.
+    verify(mDisp, times(10)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testAlreadySelectedContainerFromInterQueuePreemption()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * QueueB is under utilized and QueueA has to release 9 containers here.
+     * However within queue A, high priority app has also a demand for 20.
+     * So additional 11 more containers will be preempted making a tota of 20.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 95 170 0]);" + // root
+            "-a(=[60 100 70 50 0]);" + // a
+            "-b(=[40 100 25 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,50,false,15);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,20,false,20);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,20,false,20);" + // app3 b
+            "b\t" // app1 in a
+            + "(4,1,n1,,5,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // As per intra queue preemption algorithm, 20 more containers were needed
+    // for app2 (in queue a). Inter queue pre-emption had already preselected 9
+    // containers and hence preempted only 11 more.
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testSkipAMContainersInInterQueuePreemption() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+     * While preempting containers during intra-queue preemption, AM containers
+     * will be spared for now. Verify the same.
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 170 0]);" + // root
+            "-a(=[60 100 60 50 0]);" + // a
+            "-b(=[40 100 40 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,30,false,10);" + "a\t" // app2 in a
+            + "(1,1,n1,,10,false,20);" + "a\t" // app3 in a
+            + "(2,1,n1,,20,false,20);" + "b\t" // app4 in b
+            + "(4,1,n1,,20,false,20);" + "b\t" // app5 in a
+            + "(4,1,n1,,20,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Ensure that only 9 containers are preempted from app2 (sparing 1 AM)
+    verify(mDisp, times(11)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testSkipAMContainersInInterQueuePreemptionSingleApp()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 50:50 Total cluster resource = 100
+     * Spare Am container from a lower priority app during its preemption
+     * cycle. Eventhough there are more demand and no other low priority
+     * apps are present, still AM contaier need to soared.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 170 0]);" + // root
+            "-a(=[50 100 50 50 0]);" + // a
+            "-b(=[50 100 50 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,10,false,10);" + "a\t" // app1 in a
+            + "(2,1,n1,,40,false,10);" + "b\t" // app2 in a
+            + "(4,1,n1,,20,false,20);" + "b\t" // app3 in b
+            + "(4,1,n1,,30,false,100)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Make sure that app1's Am container is spared. Only 9/10 is preempted.
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, never()).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+  }
+
+  @Test
+  public void testNoPreemptionForSingleApp() throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 60:40 Total cluster resource = 100
+     * Only one app is running in queue. And it has more demand but no
+     * resource are available in queue. Preemption must not occur here.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 20 50 0]);" + // root
+            "-a(=[60 100 20 50 0]);" + // a
+            "-b(=[40 100 0 0 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(4,1,n1,,20,false,50)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Ensure there are 0 preemptions since only one app is running in queue.
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+  }
+
+  @Test
+  public void testOverutilizedQueueResourceWithInterQueuePreemption()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     * Scenario:
+     * Guaranteed resource of a/b are 20:80 Total cluster resource = 100
+     * QueueB is under utilized and 20 resource will be released from queueA.
+     * 10 containers will also selected for intra-queue too but it will be
+     * pre-selected.
+     */
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 100 70 0]);" + // root
+            "-a(=[20 100 100 30 0]);" + // a
+            "-b(=[80 100 0 20 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,50,false,0);" + "a\t" // app1 in a
+            + "(3,1,n1,,50,false,30);" + "b\t" // app2 in a
+            + "(4,1,n1,,0,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Complete demand request from QueueB for 20 resource must be preempted.
+    verify(mDisp, times(20)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(2))));
+    verify(mDisp, times(0)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testNodePartitionIntraQueuePreemption() throws IOException {
+    /**
+     * The simplest test of node label, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Scenario:
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1/app2/app3 in a, and app4/app5 in b. app1 uses 50 x, app2 uses 50
+     * NO_LABEL, app3 uses 50 x, app4 uses 50 NO_LABEL. a has 20 pending
+     * resource for x for app3 of priority 2
+     *
+     * After preemption, it should preempt 20 from app1
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig = "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+            "-a(=[50 100 50 50],x=[50 100 50 50]);" + // a
+            "-b(=[50 100 50 50],x=[50 100 50 50])"; // b
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,x,50,false,10);" + // 50 * x in n1
+            "a\t" // app2 in a
+            + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+            "a\t" // app2 in a
+            + "(1,1,n2,,50,false);" + // 50 default in n2
+            "b\t" // app3 in b
+            + "(1,1,n1,x,50,false);" + // 50 * x in n1
+            "b\t" // app4 in b
+            + "(1,1,n2,,50,false)"; // 50 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // 20 preempted from app1
+    verify(mDisp, times(20))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, never())
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+    verify(mDisp, never())
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testComplexIntraQueuePreemption() throws IOException {
+    /**
+     * The complex test preemption, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *     /  | | \
+     *    a  b  c  d
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b/c/d are 10:40:20:30 Total cluster resource =
+     * 100
+     * All queues under its capacity, but within each queue there are many
+     * under served applications.
+     */
+
+    // report "ideal" preempt as 50%. Ensure preemption happens only for 50%
+    conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
+        (float) 0.5);
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 75 130 0]);" + // root
+            "-a(=[10 100 5 50 0]);" + // a
+            "-b(=[40 100 35 60 0]);" + // b
+            "-c(=[20 100 10 10 0]);" + // c
+            "-d(=[30 100 25 10 0])"; // d
+
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved,
+        // pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t"
+            + "(4,1,n1,,0,false,25);" + // app2 a
+            "a\t"
+            + "(5,1,n1,,0,false,2);" + // app3 a
+            "b\t"
+            + "(3,1,n1,,5,false,20);" + // app4 b
+            "b\t"
+            + "(4,1,n1,,15,false,10);" + // app5 b
+            "b\t"
+            + "(4,1,n1,,10,false,10);" + // app6 b
+            "b\t"
+            + "(5,1,n1,,3,false,5);" + // app7 b
+            "b\t"
+            + "(5,1,n1,,0,false,2);" + // app8 b
+            "b\t"
+            + "(6,1,n1,,2,false,10);" + // app9 in b
+            "c\t"
+            + "(1,1,n1,,8,false,10);" + // app10 in c
+            "c\t"
+            + "(1,1,n1,,2,false,5);" + // app11 in c
+            "c\t"
+            + "(2,1,n1,,0,false,3);" + "d\t" // app12 in c
+            + "(2,1,n1,,25,false,0);" + "d\t" // app13 in d
+            + "(1,1,n1,,0,false,20)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // High priority app in queueA has 30 resource demand. But low priority
+    // app has only 5 resource. Hence preempt 4 here sparing AM.
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(1))));
+    // Multiple high priority apps has demand  of 17. This will be preempted
+    // from another set of low priority apps.
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(4))));
+    verify(mDisp, times(9)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(6))));
+    verify(mDisp, times(4)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(5))));
+    // Only 3 resources will be freed in this round for queue C as we
+    // are trying to save AM container.
+    verify(mDisp, times(2)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(10))));
+    verify(mDisp, times(1)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(11))));
+  }
+
+  @Test
+  public void testIntraQueuePreemptionWithTwoUsers()
+      throws IOException {
+    /**
+     * Queue structure is:
+     *
+     * <pre>
+     *       root
+     *      /   \
+     *     a     b
+     * </pre>
+     *
+     * Scenario:
+     * Guaranteed resource of a/b are 40:60 Total cluster resource = 100
+     * Consider 2 users in a queue, assume minimum user limit factor is 50%.
+     * Hence in queueB of 40, each use has a quota of 20. app4 of high priority
+     * has a demand of 30 and its already using 5. Adhering to userlimit only
+     * 15 more must be preempted. If its same user,20 would have been preempted
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;";
+    String nodesConfig = // n1 has no label
+        "n1= res=100";
+    String queuesConfig =
+        // guaranteed,max,used,pending,reserved
+        "root(=[100 100 55 170 0]);" + // root
+            "-a(=[60 100 10 50 0]);" + // a
+            "-b(=[40 100 40 120 0])"; // b
+
+    String appsConfig =
+    // queueName\t(priority,resource,host,expression,#repeat,reserved,pending)
+        "a\t" // app1 in a
+            + "(1,1,n1,,5,false,25);" + // app1 a
+            "a\t" // app2 in a
+            + "(2,1,n1,,5,false,25);" + // app2 a
+            "b\t" // app3 in b
+            + "(4,1,n1,,35,false,20,user1);" + // app3 b
+            "b\t" // app4 in b
+            + "(6,1,n1,,5,false,30,user2)";
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Considering user-limit of 50% since only 2 users are there, only preempt
+    // 15 more (5 is already running) eventhough demand is for 30.
+    verify(mDisp, times(15)).handle(argThat(
+        new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
+            getAppAttemptId(3))));
+  }
+
+  @Test
+  public void testComplexNodePartitionIntraQueuePreemption()
+      throws IOException {
+    /**
+     * The simplest test of node label, Queue structure is:
+     *
+     * <pre>
+     *       root
+     *       /  \
+     *      a    b
+     * </pre>
+     *
+     * Scenario:
+     * Both a/b can access x, and guaranteed capacity of them is 50:50. Two
+     * nodes, n1 has 100 x, n2 has 100 NO_LABEL 4 applications in the cluster,
+     * app1-app4 in a, and app5-app9 in b.
+     *
+     */
+
+    // Set max preemption limit as 50%.
+    conf.setFloat(CapacitySchedulerConfiguration.
+        INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
+        (float) 0.5);
+
+    String labelsConfig = "=100,true;" + // default partition
+        "x=100,true"; // partition=x
+    String nodesConfig = "n1=x;" + // n1 has partition=x
+        "n2="; // n2 is default partition
+    String queuesConfig =
+        // guaranteed,max,used,pending
+        "root(=[100 100 100 100],x=[100 100 100 100]);" + // root
+            "-a(=[50 100 50 50],x=[50 100 40 50]);" + // a
+            "-b(=[50 100 35 50],x=[50 100 50 50])"; // b
+    String appsConfig =
+        // queueName\t(priority,resource,host,expression,#repeat,reserved)
+        "a\t" // app1 in a
+            + "(1,1,n1,x,35,false,10);" + // 20 * x in n1
+            "a\t" // app2 in a
+            + "(1,1,n1,x,5,false,10);" + // 20 * x in n1
+            "a\t" // app3 in a
+            + "(2,1,n1,x,0,false,20);" + // 0 * x in n1
+            "a\t" // app4 in a
+            + "(1,1,n2,,50,false);" + // 50 default in n2
+            "b\t" // app5 in b
+            + "(1,1,n1,x,50,false);" + // 50 * x in n1
+            "b\t" // app6 in b
+            + "(1,1,n2,,25,false);" + // 25 * default in n2
+            "b\t" // app7 in b
+            + "(1,1,n2,,3,false);" + // 3 * default in n2
+            "b\t" // app8 in b
+            + "(1,1,n2,,2,false);" + // 2 * default in n2
+            "b\t" // app9 in b
+            + "(5,1,n2,,5,false,30)"; // 50 default in n2
+
+    buildEnv(labelsConfig, nodesConfig, queuesConfig, appsConfig);
+    policy.editSchedule();
+
+    // Label X: app3 has demand of 20 for label X. Hence app2 will loose
+    // 4 (sparing AM) and 16 more from app1 till preemption limit is met.
+    verify(mDisp, times(16))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(1))));
+    verify(mDisp, times(4))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(2))));
+
+    // Default Label:For a demand of 30, preempt from all low priority
+    // apps of default label. 25 will be preempted as preemption limit is
+    // met.
+    verify(mDisp, times(1))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(8))));
+    verify(mDisp, times(2))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(7))));
+    verify(mDisp, times(22))
+        .handle(argThat(new IsPreemptionRequestFor(getAppAttemptId(6))));
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)

Posted by ep...@apache.org.
YARN-2009. CapacityScheduler: Add intra-queue preemption for app priority support. (Sunil G via wangda)

(cherry-picked from commit 90dd3a8148468ac37a3f2173ad8d45e38bfcb0c9)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/6a18ae84
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/6a18ae84
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/6a18ae84

Branch: refs/heads/branch-2.8
Commit: 6a18ae849faa9becccd66b382296213a6c08842e
Parents: 01b50b3
Author: Wangda Tan <wa...@apache.org>
Authored: Mon Oct 31 15:18:31 2016 -0700
Committer: Eric Payne <ep...@apache.org>
Committed: Mon Dec 12 23:13:08 2016 +0000

----------------------------------------------------------------------
 .../AbstractPreemptableResourceCalculator.java  | 244 ++++++
 .../capacity/AbstractPreemptionEntity.java      |  98 +++
 .../CapacitySchedulerPreemptionContext.java     |  14 +
 .../CapacitySchedulerPreemptionUtils.java       | 119 ++-
 .../capacity/FifoCandidatesSelector.java        | 129 +--
 .../FifoIntraQueuePreemptionPlugin.java         | 459 ++++++++++
 .../capacity/IntraQueueCandidatesSelector.java  | 238 +++++
 .../IntraQueuePreemptionComputePlugin.java      |  39 +
 .../capacity/PreemptableResourceCalculator.java | 183 +---
 .../capacity/PreemptionCandidatesSelector.java  |  32 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  86 +-
 .../monitor/capacity/TempAppPerPartition.java   | 101 +++
 .../monitor/capacity/TempQueuePerPartition.java | 142 ++-
 .../CapacitySchedulerConfiguration.java         |  31 +
 .../scheduler/capacity/LeafQueue.java           |  40 +-
 .../scheduler/common/fica/FiCaSchedulerApp.java |  18 +
 ...alCapacityPreemptionPolicyMockFramework.java | 126 ++-
 ...ionalCapacityPreemptionPolicyIntraQueue.java | 868 +++++++++++++++++++
 18 files changed, 2552 insertions(+), 415 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
new file mode 100644
index 0000000..8255a30
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptableResourceCalculator.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.PriorityQueue;
+
+/**
+ * Calculate how much resources need to be preempted for each queue,
+ * will be used by {@link PreemptionCandidatesSelector}.
+ */
+public class AbstractPreemptableResourceCalculator {
+
+  protected final CapacitySchedulerPreemptionContext context;
+  protected final ResourceCalculator rc;
+  private boolean isReservedPreemptionCandidatesSelector;
+
+  static class TQComparator implements Comparator<TempQueuePerPartition> {
+    private ResourceCalculator rc;
+    private Resource clusterRes;
+
+    TQComparator(ResourceCalculator rc, Resource clusterRes) {
+      this.rc = rc;
+      this.clusterRes = clusterRes;
+    }
+
+    @Override
+    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
+      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
+        return -1;
+      }
+      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
+        return 1;
+      }
+      return 0;
+    }
+
+    // Calculates idealAssigned / guaranteed
+    // TempQueues with 0 guarantees are always considered the most over
+    // capacity and therefore considered last for resources.
+    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
+      double pctOver = Integer.MAX_VALUE;
+      if (q != null && Resources.greaterThan(rc, clusterRes, q.getGuaranteed(),
+          Resources.none())) {
+        pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
+            q.getGuaranteed());
+      }
+      return (pctOver);
+    }
+  }
+
+  /**
+   * PreemptableResourceCalculator constructor.
+   *
+   * @param preemptionContext context
+   * @param isReservedPreemptionCandidatesSelector
+   *          this will be set by different implementation of candidate
+   *          selectors, please refer to TempQueuePerPartition#offer for
+   *          details.
+   */
+  public AbstractPreemptableResourceCalculator(
+      CapacitySchedulerPreemptionContext preemptionContext,
+      boolean isReservedPreemptionCandidatesSelector) {
+    context = preemptionContext;
+    rc = preemptionContext.getResourceCalculator();
+    this.isReservedPreemptionCandidatesSelector =
+        isReservedPreemptionCandidatesSelector;
+  }
+
+  /**
+   * Given a set of queues compute the fix-point distribution of unassigned
+   * resources among them. As pending request of a queue are exhausted, the
+   * queue is removed from the set and remaining capacity redistributed among
+   * remaining queues. The distribution is weighted based on guaranteed
+   * capacity, unless asked to ignoreGuarantee, in which case resources are
+   * distributed uniformly.
+   *
+   * @param totGuarant
+   *          total guaranteed resource
+   * @param qAlloc
+   *          List of child queues
+   * @param unassigned
+   *          Unassigned resource per queue
+   * @param ignoreGuarantee
+   *          ignore guarantee per queue.
+   */
+  protected void computeFixpointAllocation(Resource totGuarant,
+      Collection<TempQueuePerPartition> qAlloc, Resource unassigned,
+      boolean ignoreGuarantee) {
+    // Prior to assigning the unused resources, process each queue as follows:
+    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
+    // Else idealAssigned = current;
+    // Subtract idealAssigned resources from unassigned.
+    // If the queue has all of its needs met (that is, if
+    // idealAssigned >= current + pending), remove it from consideration.
+    // Sort queues from most under-guaranteed to most over-guaranteed.
+    TQComparator tqComparator = new TQComparator(rc, totGuarant);
+    PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
+        tqComparator);
+    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
+      TempQueuePerPartition q = i.next();
+      Resource used = q.getUsed();
+
+      if (Resources.greaterThan(rc, totGuarant, used, q.getGuaranteed())) {
+        q.idealAssigned = Resources.add(q.getGuaranteed(), q.untouchableExtra);
+      } else {
+        q.idealAssigned = Resources.clone(used);
+      }
+      Resources.subtractFrom(unassigned, q.idealAssigned);
+      // If idealAssigned < (allocated + used + pending), q needs more
+      // resources, so
+      // add it to the list of underserved queues, ordered by need.
+      Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
+      if (Resources.lessThan(rc, totGuarant, q.idealAssigned, curPlusPend)) {
+        orderedByNeed.add(q);
+      }
+    }
+
+    // assign all cluster resources until no more demand, or no resources are
+    // left
+    while (!orderedByNeed.isEmpty() && Resources.greaterThan(rc, totGuarant,
+        unassigned, Resources.none())) {
+      Resource wQassigned = Resource.newInstance(0, 0);
+      // we compute normalizedGuarantees capacity based on currently active
+      // queues
+      resetCapacity(unassigned, orderedByNeed, ignoreGuarantee);
+
+      // For each underserved queue (or set of queues if multiple are equally
+      // underserved), offer its share of the unassigned resources based on its
+      // normalized guarantee. After the offer, if the queue is not satisfied,
+      // place it back in the ordered list of queues, recalculating its place
+      // in the order of most under-guaranteed to most over-guaranteed. In this
+      // way, the most underserved queue(s) are always given resources first.
+      Collection<TempQueuePerPartition> underserved = getMostUnderservedQueues(
+          orderedByNeed, tqComparator);
+      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
+          .hasNext();) {
+        TempQueuePerPartition sub = i.next();
+        Resource wQavail = Resources.multiplyAndNormalizeUp(rc, unassigned,
+            sub.normalizedGuarantee, Resource.newInstance(1, 1));
+        Resource wQidle = sub.offer(wQavail, rc, totGuarant,
+            isReservedPreemptionCandidatesSelector);
+        Resource wQdone = Resources.subtract(wQavail, wQidle);
+
+        if (Resources.greaterThan(rc, totGuarant, wQdone, Resources.none())) {
+          // The queue is still asking for more. Put it back in the priority
+          // queue, recalculating its order based on need.
+          orderedByNeed.add(sub);
+        }
+        Resources.addTo(wQassigned, wQdone);
+      }
+      Resources.subtractFrom(unassigned, wQassigned);
+    }
+
+    // Sometimes its possible that, all queues are properly served. So intra
+    // queue preemption will not try for any preemption. How ever there are
+    // chances that within a queue, there are some imbalances. Hence make sure
+    // all queues are added to list.
+    while (!orderedByNeed.isEmpty()) {
+      TempQueuePerPartition q1 = orderedByNeed.remove();
+      context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+    }
+  }
+
+  /**
+   * Computes a normalizedGuaranteed capacity based on active queues.
+   *
+   * @param clusterResource
+   *          the total amount of resources in the cluster
+   * @param queues
+   *          the list of queues to consider
+   * @param ignoreGuar
+   *          ignore guarantee.
+   */
+  private void resetCapacity(Resource clusterResource,
+      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
+    Resource activeCap = Resource.newInstance(0, 0);
+
+    if (ignoreGuar) {
+      for (TempQueuePerPartition q : queues) {
+        q.normalizedGuarantee = 1.0f / queues.size();
+      }
+    } else {
+      for (TempQueuePerPartition q : queues) {
+        Resources.addTo(activeCap, q.getGuaranteed());
+      }
+      for (TempQueuePerPartition q : queues) {
+        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
+            q.getGuaranteed(), activeCap);
+      }
+    }
+  }
+
+  // Take the most underserved TempQueue (the one on the head). Collect and
+  // return the list of all queues that have the same idealAssigned
+  // percentage of guaranteed.
+  private Collection<TempQueuePerPartition> getMostUnderservedQueues(
+      PriorityQueue<TempQueuePerPartition> orderedByNeed,
+      TQComparator tqComparator) {
+    ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
+    while (!orderedByNeed.isEmpty()) {
+      TempQueuePerPartition q1 = orderedByNeed.remove();
+      underserved.add(q1);
+
+      // Add underserved queues in order for later uses
+      context.addPartitionToUnderServedQueues(q1.queueName, q1.partition);
+      TempQueuePerPartition q2 = orderedByNeed.peek();
+      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
+      // return what has already been collected. Otherwise, q1's pct of
+      // guaranteed == that of q2, so add q2 to underserved list during the
+      // next pass.
+      if (q2 == null || tqComparator.compare(q1, q2) < 0) {
+        if (null != q2) {
+          context.addPartitionToUnderServedQueues(q2.queueName, q2.partition);
+        }
+        return underserved;
+      }
+    }
+    return underserved;
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
new file mode 100644
index 0000000..dbd1f0a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/AbstractPreemptionEntity.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+
+/**
+ * Abstract temporary data-structure for tracking resource availability,pending
+ * resource need, current utilization for app/queue.
+ */
+public class AbstractPreemptionEntity {
+  // Following fields are copied from scheduler
+  final String queueName;
+
+  protected final Resource current;
+  protected final Resource amUsed;
+  protected final Resource reserved;
+  protected Resource pending;
+
+  // Following fields are settled and used by candidate selection policies
+  Resource idealAssigned;
+  Resource toBePreempted;
+  Resource selected;
+  private Resource actuallyToBePreempted;
+  private Resource toBePreemptFromOther;
+
+  AbstractPreemptionEntity(String queueName, Resource usedPerPartition,
+      Resource amUsedPerPartition, Resource reserved,
+      Resource pendingPerPartition) {
+    this.queueName = queueName;
+    this.current = usedPerPartition;
+    this.pending = pendingPerPartition;
+    this.reserved = reserved;
+    this.amUsed = amUsedPerPartition;
+
+    this.idealAssigned = Resource.newInstance(0, 0);
+    this.actuallyToBePreempted = Resource.newInstance(0, 0);
+    this.toBePreempted = Resource.newInstance(0, 0);
+    this.toBePreemptFromOther = Resource.newInstance(0, 0);
+    this.selected = Resource.newInstance(0, 0);
+  }
+
+  public Resource getUsed() {
+    return current;
+  }
+
+  public Resource getUsedDeductAM() {
+    return Resources.subtract(current, amUsed);
+  }
+
+  public Resource getAMUsed() {
+    return amUsed;
+  }
+
+  public Resource getPending() {
+    return pending;
+  }
+
+  public Resource getReserved() {
+    return reserved;
+  }
+
+  public Resource getActuallyToBePreempted() {
+    return actuallyToBePreempted;
+  }
+
+  public void setActuallyToBePreempted(Resource actuallyToBePreempted) {
+    this.actuallyToBePreempted = actuallyToBePreempted;
+  }
+
+  public Resource getToBePreemptFromOther() {
+    return toBePreemptFromOther;
+  }
+
+  public void setToBePreemptFromOther(Resource toBePreemptFromOther) {
+    this.toBePreemptFromOther = toBePreemptFromOther;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index c52127d..982b1f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -19,11 +19,13 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
 import java.util.Collection;
+import java.util.LinkedHashSet;
 import java.util.Set;
 
 interface CapacitySchedulerPreemptionContext {
@@ -49,4 +51,16 @@ interface CapacitySchedulerPreemptionContext {
   Set<String> getLeafQueueNames();
 
   Set<String> getAllPartitions();
+
+  int getClusterMaxApplicationPriority();
+
+  Resource getPartitionResource(String partition);
+
+  LinkedHashSet<String> getUnderServedQueuesPerPartition(String partition);
+
+  void addPartitionToUnderServedQueues(String queueName, String partition);
+
+  float getMinimumThresholdForIntraQueuePreemption();
+
+  float getMaxAllowableLimitForIntraQueuePreemption();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 42d8730..abad2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -19,11 +19,14 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -40,7 +43,8 @@ public class CapacitySchedulerPreemptionUtils {
         continue;
       }
 
-      //  Only add resToObtainByPartition when actuallyToBePreempted resource >= 0
+      // Only add resToObtainByPartition when actuallyToBePreempted resource >=
+      // 0
       if (Resources.greaterThan(context.getResourceCalculator(),
           clusterResource, qT.getActuallyToBePreempted(), Resources.none())) {
         resToObtainByPartition.put(qT.partition,
@@ -57,8 +61,8 @@ public class CapacitySchedulerPreemptionUtils {
       return false;
     }
 
-    Set<RMContainer> containers = selectedCandidates.get(
-        container.getApplicationAttemptId());
+    Set<RMContainer> containers = selectedCandidates
+        .get(container.getApplicationAttemptId());
     if (containers == null) {
       return false;
     }
@@ -70,8 +74,8 @@ public class CapacitySchedulerPreemptionUtils {
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
     for (Set<RMContainer> containers : selectedCandidates.values()) {
       for (RMContainer c : containers) {
-        SchedulerNode schedulerNode = context.getScheduler().getSchedulerNode(
-            c.getAllocatedNode());
+        SchedulerNode schedulerNode = context.getScheduler()
+            .getSchedulerNode(c.getAllocatedNode());
         if (null == schedulerNode) {
           continue;
         }
@@ -89,8 +93,113 @@ public class CapacitySchedulerPreemptionUtils {
         if (null != res) {
           tq.deductActuallyToBePreempted(context.getResourceCalculator(),
               tq.totalPartitionResource, res);
+          Collection<TempAppPerPartition> tas = tq.getApps();
+          if (null == tas || tas.isEmpty()) {
+            continue;
+          }
+
+          deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
+              tas, res, partition);
         }
       }
     }
   }
+
+  private static void deductPreemptableResourcePerApp(
+      CapacitySchedulerPreemptionContext context,
+      Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
+      Resource res, String partition) {
+    for (TempAppPerPartition ta : tas) {
+      ta.deductActuallyToBePreempted(context.getResourceCalculator(),
+          totalPartitionResource, res, partition);
+    }
+  }
+
+  /**
+   * Invoke this method to preempt container based on resToObtain.
+   *
+   * @param rc
+   *          resource calculator
+   * @param context
+   *          preemption context
+   * @param resourceToObtainByPartitions
+   *          map to hold resource to obtain per partition
+   * @param rmContainer
+   *          container
+   * @param clusterResource
+   *          total resource
+   * @param preemptMap
+   *          map to hold preempted containers
+   * @param totalPreemptionAllowed
+   *          total preemption allowed per round
+   * @return should we preempt rmContainer. If we should, deduct from
+   *         <code>resourceToObtainByPartition</code>
+   */
+  public static boolean tryPreemptContainerAndDeductResToObtain(
+      ResourceCalculator rc, CapacitySchedulerPreemptionContext context,
+      Map<String, Resource> resourceToObtainByPartitions,
+      RMContainer rmContainer, Resource clusterResource,
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      Resource totalPreemptionAllowed) {
+    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
+
+    // We will not account resource of a container twice or more
+    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
+      return false;
+    }
+
+    String nodePartition = getPartitionByNodeId(context,
+        rmContainer.getAllocatedNode());
+    Resource toObtainByPartition = resourceToObtainByPartitions
+        .get(nodePartition);
+
+    if (null != toObtainByPartition
+        && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
+            Resources.none())
+        && Resources.fitsIn(rc, clusterResource,
+            rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
+      Resources.subtractFrom(toObtainByPartition,
+          rmContainer.getAllocatedResource());
+      Resources.subtractFrom(totalPreemptionAllowed,
+          rmContainer.getAllocatedResource());
+
+      // When we have no more resource need to obtain, remove from map.
+      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
+          Resources.none())) {
+        resourceToObtainByPartitions.remove(nodePartition);
+      }
+
+      // Add to preemptMap
+      addToPreemptMap(preemptMap, attemptId, rmContainer);
+      return true;
+    }
+
+    return false;
+  }
+
+  private static String getPartitionByNodeId(
+      CapacitySchedulerPreemptionContext context, NodeId nodeId) {
+    return context.getScheduler().getSchedulerNode(nodeId).getPartition();
+  }
+
+  private static void addToPreemptMap(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
+    Set<RMContainer> set = preemptMap.get(appAttemptId);
+    if (null == set) {
+      set = new HashSet<>();
+      preemptMap.put(appAttemptId, set);
+    }
+    set.add(containerToPreempt);
+  }
+
+  private static boolean preemptMapContains(
+      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
+      ApplicationAttemptId attemptId, RMContainer rmContainer) {
+    Set<RMContainer> rmContainers = preemptMap.get(attemptId);
+    if (null == rmContainers) {
+      return false;
+    }
+    return rmContainers.contains(rmContainer);
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index a8c62fd..33e4afc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import com.google.common.annotations.VisibleForTesting;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@@ -34,9 +32,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEv
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -112,9 +107,11 @@ public class FifoCandidatesSelector
                 // Skip already selected containers
                 continue;
               }
-              boolean preempted = tryPreemptContainerAndDeductResToObtain(
-                  resToObtainByPartition, c, clusterResource, selectedCandidates,
-                  totalPreemptionAllowed);
+              boolean preempted = CapacitySchedulerPreemptionUtils
+                  .tryPreemptContainerAndDeductResToObtain(rc,
+                      preemptionContext, resToObtainByPartition, c,
+                      clusterResource, selectedCandidates,
+                      totalPreemptionAllowed);
               if (!preempted) {
                 continue;
               }
@@ -185,9 +182,10 @@ public class FifoCandidatesSelector
         break;
       }
 
-      boolean preempted =
-          tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-              clusterResource, preemptMap, totalPreemptionAllowed);
+      boolean preempted = CapacitySchedulerPreemptionUtils
+          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
+              resToObtainByPartition, c, clusterResource, preemptMap,
+              totalPreemptionAllowed);
       if (preempted) {
         Resources.subtractFrom(skippedAMSize, c.getAllocatedResource());
       }
@@ -195,68 +193,6 @@ public class FifoCandidatesSelector
     skippedAMContainerlist.clear();
   }
 
-  private boolean preemptMapContains(
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      ApplicationAttemptId attemptId, RMContainer rmContainer) {
-    Set<RMContainer> rmContainers;
-    if (null == (rmContainers = preemptMap.get(attemptId))) {
-      return false;
-    }
-    return rmContainers.contains(rmContainer);
-  }
-
-  /**
-   * Return should we preempt rmContainer. If we should, deduct from
-   * <code>resourceToObtainByPartition</code>
-   */
-  private boolean tryPreemptContainerAndDeductResToObtain(
-      Map<String, Resource> resourceToObtainByPartitions,
-      RMContainer rmContainer, Resource clusterResource,
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      Resource totalPreemptionAllowed) {
-    ApplicationAttemptId attemptId = rmContainer.getApplicationAttemptId();
-
-    // We will not account resource of a container twice or more
-    if (preemptMapContains(preemptMap, attemptId, rmContainer)) {
-      return false;
-    }
-
-    String nodePartition = getPartitionByNodeId(rmContainer.getAllocatedNode());
-    Resource toObtainByPartition =
-        resourceToObtainByPartitions.get(nodePartition);
-
-    if (null != toObtainByPartition && Resources.greaterThan(rc,
-        clusterResource, toObtainByPartition, Resources.none()) && Resources
-        .fitsIn(rc, clusterResource, rmContainer.getAllocatedResource(),
-            totalPreemptionAllowed)) {
-      Resources.subtractFrom(toObtainByPartition,
-          rmContainer.getAllocatedResource());
-      Resources.subtractFrom(totalPreemptionAllowed,
-          rmContainer.getAllocatedResource());
-
-      // When we have no more resource need to obtain, remove from map.
-      if (Resources.lessThanOrEqual(rc, clusterResource, toObtainByPartition,
-          Resources.none())) {
-        resourceToObtainByPartitions.remove(nodePartition);
-      }
-      if (LOG.isDebugEnabled()) {
-        LOG.debug(this.getClass().getName() + " Marked container=" + rmContainer
-            .getContainerId() + " from partition=" + nodePartition + " queue="
-            + rmContainer.getQueueName() + " to be preemption candidates");
-      }
-      // Add to preemptMap
-      addToPreemptMap(preemptMap, attemptId, rmContainer);
-      return true;
-    }
-
-    return false;
-  }
-
-  private String getPartitionByNodeId(NodeId nodeId) {
-    return preemptionContext.getScheduler().getSchedulerNode(nodeId)
-        .getPartition();
-  }
-
   /**
    * Given a target preemption for a specific application, select containers
    * to preempt (after unreserving all reservation for that app).
@@ -268,10 +204,6 @@ public class FifoCandidatesSelector
       Map<ApplicationAttemptId, Set<RMContainer>> selectedContainers,
       Resource totalPreemptionAllowed) {
     ApplicationAttemptId appId = app.getApplicationAttemptId();
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("Looking at application=" + app.getApplicationAttemptId()
-          + " resourceToObtain=" + resToObtainByPartition);
-    }
 
     // first drop reserved containers towards rsrcPreempt
     List<RMContainer> reservedContainers =
@@ -286,8 +218,9 @@ public class FifoCandidatesSelector
       }
 
       // Try to preempt this container
-      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-          clusterResource, selectedContainers, totalPreemptionAllowed);
+      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+          selectedContainers, totalPreemptionAllowed);
 
       if (!preemptionContext.isObserveOnly()) {
         preemptionContext.getRMContext().getDispatcher().getEventHandler()
@@ -328,41 +261,9 @@ public class FifoCandidatesSelector
       }
 
       // Try to preempt this container
-      tryPreemptContainerAndDeductResToObtain(resToObtainByPartition, c,
-          clusterResource, selectedContainers, totalPreemptionAllowed);
-    }
-  }
-
-  /**
-   * Compare by reversed priority order first, and then reversed containerId
-   * order
-   * @param containers
-   */
-  @VisibleForTesting
-  static void sortContainers(List<RMContainer> containers){
-    Collections.sort(containers, new Comparator<RMContainer>() {
-      @Override
-      public int compare(RMContainer a, RMContainer b) {
-        Comparator<Priority> c = new org.apache.hadoop.yarn.server
-            .resourcemanager.resource.Priority.Comparator();
-        int priorityComp = c.compare(b.getContainer().getPriority(),
-            a.getContainer().getPriority());
-        if (priorityComp != 0) {
-          return priorityComp;
-        }
-        return b.getContainerId().compareTo(a.getContainerId());
-      }
-    });
-  }
-
-  private void addToPreemptMap(
-      Map<ApplicationAttemptId, Set<RMContainer>> preemptMap,
-      ApplicationAttemptId appAttemptId, RMContainer containerToPreempt) {
-    Set<RMContainer> set;
-    if (null == (set = preemptMap.get(appAttemptId))) {
-      set = new HashSet<>();
-      preemptMap.put(appAttemptId, set);
+      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+          selectedContainers, totalPreemptionAllowed);
     }
-    set.add(containerToPreempt);
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
new file mode 100644
index 0000000..757f567
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -0,0 +1,459 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Set;
+import java.util.TreeSet;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * FifoIntraQueuePreemptionPlugin will handle intra-queue preemption for
+ * priority and user-limit.
+ */
+public class FifoIntraQueuePreemptionPlugin
+    implements
+      IntraQueuePreemptionComputePlugin {
+
+  protected final CapacitySchedulerPreemptionContext context;
+  protected final ResourceCalculator rc;
+
+  private static final Log LOG =
+      LogFactory.getLog(FifoIntraQueuePreemptionPlugin.class);
+
+  public FifoIntraQueuePreemptionPlugin(ResourceCalculator rc,
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    this.context = preemptionContext;
+    this.rc = rc;
+  }
+
+  @Override
+  public Map<String, Resource> getResourceDemandFromAppsPerQueue(
+      String queueName, String partition) {
+
+    Map<String, Resource> resToObtainByPartition = new HashMap<>();
+    TempQueuePerPartition tq = context
+        .getQueueByPartition(queueName, partition);
+
+    Collection<TempAppPerPartition> appsOrderedByPriority = tq.getApps();
+    Resource actualPreemptNeeded = resToObtainByPartition.get(partition);
+
+    // Updating pending resource per-partition level.
+    if (actualPreemptNeeded == null) {
+      actualPreemptNeeded = Resources.createResource(0, 0);
+      resToObtainByPartition.put(partition, actualPreemptNeeded);
+    }
+
+    for (TempAppPerPartition a1 : appsOrderedByPriority) {
+      Resources.addTo(actualPreemptNeeded, a1.getActuallyToBePreempted());
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Selected to preempt " + actualPreemptNeeded
+          + " resource from partition:" + partition);
+    }
+    return resToObtainByPartition;
+  }
+
+  @Override
+  public void computeAppsIdealAllocation(Resource clusterResource,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource totalPreemptedResourceAllowed,
+      Resource queueReassignableResource, float maxAllowablePreemptLimit) {
+
+    // 1. AM used resource can be considered as a frozen resource for now.
+    // Hence such containers in a queue can be omitted from the preemption
+    // calculation.
+    Map<String, Resource> perUserAMUsed = new HashMap<String, Resource>();
+    Resource amUsed = calculateUsedAMResourcesPerQueue(tq.partition,
+        tq.leafQueue, perUserAMUsed);
+    Resources.subtractFrom(queueReassignableResource, amUsed);
+
+    // 2. tq.leafQueue will not be null as we validated it in caller side
+    Collection<FiCaSchedulerApp> apps = tq.leafQueue.getAllApplications();
+
+    // We do not need preemption for a single app
+    if (apps.size() == 1) {
+      return;
+    }
+
+    // 3. Create all tempApps for internal calculation and return a list from
+    // high priority to low priority order.
+    TAPriorityComparator taComparator = new TAPriorityComparator();
+    PriorityQueue<TempAppPerPartition> orderedByPriority =
+        createTempAppForResCalculation(tq.partition, apps, taComparator);
+
+    // 4. Calculate idealAssigned per app by checking based on queue's
+    // unallocated resource.Also return apps arranged from lower priority to
+    // higher priority.
+    TreeSet<TempAppPerPartition> orderedApps =
+        calculateIdealAssignedResourcePerApp(clusterResource,
+            partitionBasedResource, tq, selectedCandidates,
+            queueReassignableResource, orderedByPriority, perUserAMUsed);
+
+    // 5. A configurable limit that could define an ideal allowable preemption
+    // limit. Based on current queue's capacity,defined how much % could become
+    // preemptable.
+    Resource maxIntraQueuePreemptable = Resources.multiply(tq.getGuaranteed(),
+        maxAllowablePreemptLimit);
+    if (Resources.greaterThan(rc, clusterResource, maxIntraQueuePreemptable,
+        tq.getActuallyToBePreempted())) {
+      Resources.subtractFrom(maxIntraQueuePreemptable,
+          tq.getActuallyToBePreempted());
+    } else {
+      maxIntraQueuePreemptable = Resource.newInstance(0, 0);
+    }
+
+    // 6. We have two configurations here, one is intra queue limit and second
+    // one is per-round limit for any time preemption. Take a minimum of these
+    Resource preemptionLimit = Resources.min(rc, clusterResource,
+        maxIntraQueuePreemptable, totalPreemptedResourceAllowed);
+
+    // 7. From lowest priority app onwards, calculate toBePreempted resource
+    // based on demand.
+    calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
+        preemptionLimit);
+
+    // Save all apps (low to high) to temp queue for further reference
+    tq.addAllApps(orderedApps);
+
+    // 8. There are chances that we may preempt for the demand from same
+    // priority level, such cases are to be validated out.
+    validateOutSameAppPriorityFromDemand(clusterResource,
+        (TreeSet<TempAppPerPartition>) tq.getApps());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
+      for (TempAppPerPartition tmpApp : tq.getApps()) {
+        LOG.debug(tmpApp);
+      }
+    }
+  }
+
+  private void calculateToBePreemptedResourcePerApp(Resource clusterResource,
+      TreeSet<TempAppPerPartition> orderedApps, Resource preemptionLimit) {
+
+    for (TempAppPerPartition tmpApp : orderedApps) {
+      if (Resources.lessThanOrEqual(rc, clusterResource, preemptionLimit,
+          Resources.none())
+          || Resources.lessThanOrEqual(rc, clusterResource, tmpApp.getUsed(),
+              Resources.none())) {
+        continue;
+      }
+
+      Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
+          tmpApp.idealAssigned);
+      Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+      Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
+
+      // Calculate toBePreempted from apps as follows:
+      // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
+      // intra_q_preemptable)
+      tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
+          .max(rc, clusterResource, preemtableFromApp, Resources.none()),
+          preemptionLimit);
+
+      preemptionLimit = Resources.subtract(preemptionLimit,
+          tmpApp.toBePreempted);
+    }
+  }
+
+  /**
+   * Algorithm for calculating idealAssigned is as follows:
+   * For each partition:
+   *  Q.reassignable = Q.used - Q.selected;
+   *  
+   * # By default set ideal assigned 0 for app.
+   * app.idealAssigned as 0
+   * # get user limit from scheduler.
+   * userLimitRes = Q.getUserLimit(userName)
+   * 
+   * # initial all value to 0
+   * Map<String, Resource> userToAllocated
+   * 
+   * # Loop from highest priority to lowest priority app to calculate ideal
+   * for app in sorted-by(priority) {
+   *  if Q.reassignable < 0:
+   *    break;
+   *    
+   *  if (user-to-allocated.get(app.user) < userLimitRes) {
+   *   idealAssigned = min((userLimitRes - userToAllocated.get(app.user)), 
+   *                      (app.used + app.pending - app.selected))
+   *   app.idealAssigned = min(Q.reassignable, idealAssigned)
+   *   userToAllocated.get(app.user) += app.idealAssigned;
+   *  } else { 
+   *   // skip this app because user-limit reached
+   *  }
+   *  Q.reassignable -= app.idealAssigned
+   * }
+   *  
+   * @param clusterResource Cluster Resource
+   * @param partitionBasedResource resource per partition
+   * @param tq TempQueue
+   * @param selectedCandidates Already Selected preemption candidates
+   * @param queueReassignableResource Resource used in a queue
+   * @param orderedByPriority List of running apps
+   * @param perUserAMUsed AM used resource
+   * @return List of temp apps ordered from low to high priority
+   */
+  private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
+      Resource clusterResource, Resource partitionBasedResource,
+      TempQueuePerPartition tq,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource queueReassignableResource,
+      PriorityQueue<TempAppPerPartition> orderedByPriority,
+      Map<String, Resource> perUserAMUsed) {
+
+    Comparator<TempAppPerPartition> reverseComp = Collections
+        .reverseOrder(new TAPriorityComparator());
+    TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
+
+    Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
+    String partition = tq.partition;
+
+    Map<String, Resource> preCalculatedUserLimit =
+        new HashMap<String, Resource>();
+
+    while (!orderedByPriority.isEmpty()) {
+      // Remove app from the next highest remaining priority and process it to
+      // calculate idealAssigned per app.
+      TempAppPerPartition tmpApp = orderedByPriority.remove();
+      orderedApps.add(tmpApp);
+
+      // Once unallocated resource is 0, we can stop assigning ideal per app.
+      if (Resources.lessThanOrEqual(rc, clusterResource,
+          queueReassignableResource, Resources.none())) {
+        continue;
+      }
+
+      String userName = tmpApp.app.getUser();
+      Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+      // Verify whether we already calculated headroom for this user.
+      if (userLimitResource == null) {
+        userLimitResource = Resources.clone(tq.leafQueue
+            .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+        Resource amUsed = perUserAMUsed.get(userName);
+        if (null == amUsed) {
+          amUsed = Resources.createResource(0, 0);
+        }
+
+        // Real AM used need not have to be considered for user-limit as well.
+        userLimitResource = Resources.subtract(userLimitResource, amUsed);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Userlimit for user '" + userName + "' is :"
+              + userLimitResource + ", and amUsed is:" + amUsed);
+        }
+
+        preCalculatedUserLimit.put(userName, userLimitResource);
+      }
+
+      Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+      if (idealAssignedForUser == null) {
+        idealAssignedForUser = Resources.createResource(0, 0);
+        userIdealAssignedMapping.put(userName, idealAssignedForUser);
+      }
+
+      // Calculate total selected container resources from current app.
+      getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+          tmpApp, partition);
+
+      // For any app, used+pending will give its idealAssigned. However it will
+      // be tightly linked to queue's unallocated quota. So lower priority apps
+      // idealAssigned may fall to 0 if higher priority apps demand is more.
+      Resource appIdealAssigned = Resources.add(tmpApp.getUsedDeductAM(),
+          tmpApp.getPending());
+      Resources.subtractFrom(appIdealAssigned, tmpApp.selected);
+
+      if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
+          userLimitResource)) {
+        appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
+            Resources.subtract(userLimitResource, idealAssignedForUser));
+        tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
+            clusterResource, queueReassignableResource, appIdealAssigned));
+        Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
+      } else {
+        continue;
+      }
+
+      // Also set how much resource is needed by this app from others.
+      Resource appUsedExcludedSelected = Resources
+          .subtract(tmpApp.getUsedDeductAM(), tmpApp.selected);
+      if (Resources.greaterThan(rc, clusterResource, tmpApp.idealAssigned,
+          appUsedExcludedSelected)) {
+        tmpApp.setToBePreemptFromOther(
+            Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
+      }
+
+      Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
+    }
+
+    return orderedApps;
+  }
+
+  /*
+   * Previous policies would have already selected few containers from an
+   * application. Calculate total resource from these selected containers.
+   */
+  private void getAlreadySelectedPreemptionCandidatesResource(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      TempAppPerPartition tmpApp, String partition) {
+    tmpApp.selected = Resources.createResource(0, 0);
+    Set<RMContainer> containers = selectedCandidates
+        .get(tmpApp.app.getApplicationAttemptId());
+
+    if (containers == null) {
+      return;
+    }
+
+    for (RMContainer cont : containers) {
+      if (partition.equals(cont.getNodeLabelExpression())) {
+        Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
+      }
+    }
+  }
+
+  private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
+      String partition, Collection<FiCaSchedulerApp> apps,
+      TAPriorityComparator taComparator) {
+    PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
+        100, taComparator);
+
+    // have an internal temp app structure to store intermediate data(priority)
+    for (FiCaSchedulerApp app : apps) {
+
+      Resource used = app.getAppAttemptResourceUsage().getUsed(partition);
+      Resource amUsed = null;
+      if (!app.isWaitingForAMContainer()) {
+        amUsed = app.getAMResource(partition);
+      }
+      Resource pending = app.getTotalPendingRequestsPerPartition()
+          .get(partition);
+      Resource reserved = app.getAppAttemptResourceUsage()
+          .getReserved(partition);
+
+      used = (used == null) ? Resources.createResource(0, 0) : used;
+      amUsed = (amUsed == null) ? Resources.createResource(0, 0) : amUsed;
+      pending = (pending == null) ? Resources.createResource(0, 0) : pending;
+      reserved = (reserved == null) ? Resources.createResource(0, 0) : reserved;
+
+      HashSet<String> partitions = new HashSet<String>(
+          app.getAppAttemptResourceUsage().getNodePartitionsSet());
+      partitions.addAll(app.getTotalPendingRequestsPerPartition().keySet());
+
+      // Create TempAppPerQueue for further calculation.
+      TempAppPerPartition tmpApp = new TempAppPerPartition(app,
+          Resources.clone(used), Resources.clone(amUsed),
+          Resources.clone(reserved), Resources.clone(pending));
+
+      // Set ideal allocation of app as 0.
+      tmpApp.idealAssigned = Resources.createResource(0, 0);
+
+      orderedByPriority.add(tmpApp);
+    }
+    return orderedByPriority;
+  }
+
+  /*
+   * Fifo+Priority based preemption policy need not have to preempt resources at
+   * same priority level. Such cases will be validated out.
+   */
+  public void validateOutSameAppPriorityFromDemand(Resource cluster,
+      TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
+
+    TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+        .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
+    if (apps.length <= 0) {
+      return;
+    }
+
+    int lPriority = 0;
+    int hPriority = apps.length - 1;
+
+    while (lPriority < hPriority
+        && !apps[lPriority].equals(apps[hPriority])
+        && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+      Resource toPreemptFromOther = apps[hPriority]
+          .getToBePreemptFromOther();
+      Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+      Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+          actuallyToPreempt);
+
+      if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+        Resource toPreempt = Resources.min(rc, cluster,
+            toPreemptFromOther, delta);
+
+        apps[hPriority].setToBePreemptFromOther(
+            Resources.subtract(toPreemptFromOther, toPreempt));
+        apps[lPriority].setActuallyToBePreempted(
+            Resources.add(actuallyToPreempt, toPreempt));
+      }
+
+      if (Resources.lessThanOrEqual(rc, cluster,
+          apps[lPriority].toBePreempted,
+          apps[lPriority].getActuallyToBePreempted())) {
+        lPriority++;
+        continue;
+      }
+
+      if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+          Resources.none())) {
+        hPriority--;
+        continue;
+      }
+    }
+  }
+
+  private Resource calculateUsedAMResourcesPerQueue(String partition,
+      LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
+    Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
+    Resource amUsed = Resources.createResource(0, 0);
+
+    for (FiCaSchedulerApp app : runningApps) {
+      Resource userAMResource = perUserAMUsed.get(app.getUser());
+      if (null == userAMResource) {
+        userAMResource = Resources.createResource(0, 0);
+        perUserAMUsed.put(app.getUser(), userAMResource);
+      }
+
+      Resources.addTo(userAMResource, app.getAMResource(partition));
+      Resources.addTo(amUsed, app.getAMResource(partition));
+    }
+    return amUsed;
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
new file mode 100644
index 0000000..039b53e
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -0,0 +1,238 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Identifies over utilized resources within a queue and tries to normalize
+ * them to resolve resource allocation anomalies w.r.t priority and user-limit.
+ */
+public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
+
+  @SuppressWarnings("serial")
+  static class TAPriorityComparator
+      implements
+        Serializable,
+        Comparator<TempAppPerPartition> {
+
+    @Override
+    public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+      Priority p1 = Priority.newInstance(tq1.getPriority());
+      Priority p2 = Priority.newInstance(tq2.getPriority());
+
+      if (!p1.equals(p2)) {
+        return p1.compareTo(p2);
+      }
+      return tq1.getApplicationId().compareTo(tq2.getApplicationId());
+    }
+  }
+
+  IntraQueuePreemptionComputePlugin fifoPreemptionComputePlugin = null;
+  final CapacitySchedulerPreemptionContext context;
+
+  private static final Log LOG =
+      LogFactory.getLog(IntraQueueCandidatesSelector.class);
+
+  IntraQueueCandidatesSelector(
+      CapacitySchedulerPreemptionContext preemptionContext) {
+    super(preemptionContext);
+    fifoPreemptionComputePlugin = new FifoIntraQueuePreemptionPlugin(rc,
+        preemptionContext);
+    context = preemptionContext;
+  }
+
+  @Override
+  public Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource, Resource totalPreemptedResourceAllowed) {
+
+    // 1. Calculate the abnormality within each queue one by one.
+    computeIntraQueuePreemptionDemand(
+        clusterResource, totalPreemptedResourceAllowed, selectedCandidates);
+
+    // 2. Previous selectors (with higher priority) could have already
+    // selected containers. We need to deduct pre-emptable resources
+    // based on already selected candidates.
+    CapacitySchedulerPreemptionUtils
+        .deductPreemptableResourcesBasedSelectedCandidates(preemptionContext,
+            selectedCandidates);
+
+    // 3. Loop through all partitions to select containers for preemption.
+    for (String partition : preemptionContext.getAllPartitions()) {
+      LinkedHashSet<String> queueNames = preemptionContext
+          .getUnderServedQueuesPerPartition(partition);
+
+      // Error check to handle non-mapped labels to queue.
+      if (null == queueNames) {
+        continue;
+      }
+
+      // 4. Iterate from most under-served queue in order.
+      for (String queueName : queueNames) {
+        LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+            RMNodeLabelsManager.NO_LABEL).leafQueue;
+
+        // skip if not a leafqueue
+        if (null == leafQueue) {
+          continue;
+        }
+
+        // 5. Calculate the resource to obtain per partition
+        Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
+            .getResourceDemandFromAppsPerQueue(queueName, partition);
+
+        // 6. Based on the selected resource demand per partition, select
+        // containers with known policy from inter-queue preemption.
+        synchronized (leafQueue) {
+          Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+              .getPreemptionIterator();
+          while (desc.hasNext()) {
+            FiCaSchedulerApp app = desc.next();
+            preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+                totalPreemptedResourceAllowed, resToObtainByPartition,
+                leafQueue, app);
+          }
+        }
+      }
+    }
+
+    return selectedCandidates;
+  }
+
+  private void preemptFromLeastStarvedApp(
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource clusterResource, Resource totalPreemptedResourceAllowed,
+      Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+      FiCaSchedulerApp app) {
+
+    // ToDo: Reuse reservation selector here.
+
+    List<RMContainer> liveContainers = new ArrayList<>(
+        app.getLiveContainers());
+    sortContainers(liveContainers);
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "totalPreemptedResourceAllowed for preemption at this round is :"
+              + totalPreemptedResourceAllowed);
+    }
+
+    for (RMContainer c : liveContainers) {
+
+      // if there are no demand, return.
+      if (resToObtainByPartition.isEmpty()) {
+        return;
+      }
+
+      // skip preselected containers.
+      if (CapacitySchedulerPreemptionUtils.isContainerAlreadySelected(c,
+          selectedCandidates)) {
+        continue;
+      }
+
+      // Skip already marked to killable containers
+      if (null != preemptionContext.getKillableContainers() && preemptionContext
+          .getKillableContainers().contains(c.getContainerId())) {
+        continue;
+      }
+
+      // Skip AM Container from preemption for now.
+      if (c.isAMContainer()) {
+        continue;
+      }
+
+      // Try to preempt this container
+      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+          selectedCandidates, totalPreemptedResourceAllowed);
+    }
+
+  }
+
+  private void computeIntraQueuePreemptionDemand(Resource clusterResource,
+      Resource totalPreemptedResourceAllowed,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates) {
+
+    // 1. Iterate through all partition to calculate demand within a partition.
+    for (String partition : context.getAllPartitions()) {
+      LinkedHashSet<String> queueNames = context
+          .getUnderServedQueuesPerPartition(partition);
+
+      if (null == queueNames) {
+        continue;
+      }
+
+      // 2. Its better to get partition based resource limit earlier before
+      // starting calculation
+      Resource partitionBasedResource =
+          context.getPartitionResource(partition);
+
+      // 3. loop through all queues corresponding to a partition.
+      for (String queueName : queueNames) {
+        TempQueuePerPartition tq = context.getQueueByPartition(queueName,
+            partition);
+        LeafQueue leafQueue = tq.leafQueue;
+
+        // skip if its parent queue
+        if (null == leafQueue) {
+          continue;
+        }
+
+        // 4. Consider reassignableResource as (used - actuallyToBePreempted).
+        // This provides as upper limit to split apps quota in a queue.
+        Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
+            tq.getActuallyToBePreempted());
+
+        // 5. Check queue's used capacity. Make sure that the used capacity is
+        // above certain limit to consider for intra queue preemption.
+        if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
+            .getMinimumThresholdForIntraQueuePreemption()) {
+          continue;
+        }
+
+        // 6. compute the allocation of all apps based on queue's unallocated
+        // capacity
+        fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
+            partitionBasedResource, tq, selectedCandidates,
+            totalPreemptedResourceAllowed,
+            queueReassignableResource,
+            context.getMaxAllowableLimitForIntraQueuePreemption());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
new file mode 100644
index 0000000..93ebe65
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
+
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+
+
+interface IntraQueuePreemptionComputePlugin {
+
+  Map<String, Resource> getResourceDemandFromAppsPerQueue(String queueName,
+      String partition);
+
+  void computeAppsIdealAllocation(Resource clusterResource,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
+      Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
+      Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
+      float maxAllowablePreemptLimit);
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
index 103b419..3017e8f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptableResourceCalculator.java
@@ -27,61 +27,22 @@ import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Comparator;
 import java.util.HashSet;
-import java.util.Iterator;
 import java.util.List;
-import java.util.PriorityQueue;
 import java.util.Set;
 
 /**
  * Calculate how much resources need to be preempted for each queue,
  * will be used by {@link PreemptionCandidatesSelector}
  */
-public class PreemptableResourceCalculator {
+public class PreemptableResourceCalculator
+    extends
+      AbstractPreemptableResourceCalculator {
   private static final Log LOG =
       LogFactory.getLog(PreemptableResourceCalculator.class);
 
-  private final CapacitySchedulerPreemptionContext context;
-  private final ResourceCalculator rc;
   private boolean isReservedPreemptionCandidatesSelector;
 
-  static class TQComparator implements Comparator<TempQueuePerPartition> {
-    private ResourceCalculator rc;
-    private Resource clusterRes;
-
-    TQComparator(ResourceCalculator rc, Resource clusterRes) {
-      this.rc = rc;
-      this.clusterRes = clusterRes;
-    }
-
-    @Override
-    public int compare(TempQueuePerPartition tq1, TempQueuePerPartition tq2) {
-      if (getIdealPctOfGuaranteed(tq1) < getIdealPctOfGuaranteed(tq2)) {
-        return -1;
-      }
-      if (getIdealPctOfGuaranteed(tq1) > getIdealPctOfGuaranteed(tq2)) {
-        return 1;
-      }
-      return 0;
-    }
-
-    // Calculates idealAssigned / guaranteed
-    // TempQueues with 0 guarantees are always considered the most over
-    // capacity and therefore considered last for resources.
-    private double getIdealPctOfGuaranteed(TempQueuePerPartition q) {
-      double pctOver = Integer.MAX_VALUE;
-      if (q != null && Resources.greaterThan(rc, clusterRes,
-          q.getGuaranteed(),
-          Resources.none())) {
-        pctOver = Resources.divide(rc, clusterRes, q.idealAssigned,
-            q.getGuaranteed());
-      }
-      return (pctOver);
-    }
-  }
-
   /**
    * PreemptableResourceCalculator constructor
    *
@@ -93,136 +54,7 @@ public class PreemptableResourceCalculator {
   public PreemptableResourceCalculator(
       CapacitySchedulerPreemptionContext preemptionContext,
       boolean isReservedPreemptionCandidatesSelector) {
-    context = preemptionContext;
-    rc = preemptionContext.getResourceCalculator();
-    this.isReservedPreemptionCandidatesSelector =
-        isReservedPreemptionCandidatesSelector;
-  }
-
-  /**
-   * Computes a normalizedGuaranteed capacity based on active queues
-   * @param rc resource calculator
-   * @param clusterResource the total amount of resources in the cluster
-   * @param queues the list of queues to consider
-   */
-  private void resetCapacity(ResourceCalculator rc, Resource clusterResource,
-      Collection<TempQueuePerPartition> queues, boolean ignoreGuar) {
-    Resource activeCap = Resource.newInstance(0, 0);
-
-    if (ignoreGuar) {
-      for (TempQueuePerPartition q : queues) {
-        q.normalizedGuarantee = 1.0f / queues.size();
-      }
-    } else {
-      for (TempQueuePerPartition q : queues) {
-        Resources.addTo(activeCap, q.getGuaranteed());
-      }
-      for (TempQueuePerPartition q : queues) {
-        q.normalizedGuarantee = Resources.divide(rc, clusterResource,
-            q.getGuaranteed(), activeCap);
-      }
-    }
-  }
-
-  // Take the most underserved TempQueue (the one on the head). Collect and
-  // return the list of all queues that have the same idealAssigned
-  // percentage of guaranteed.
-  protected Collection<TempQueuePerPartition> getMostUnderservedQueues(
-      PriorityQueue<TempQueuePerPartition> orderedByNeed,
-      TQComparator tqComparator) {
-    ArrayList<TempQueuePerPartition> underserved = new ArrayList<>();
-    while (!orderedByNeed.isEmpty()) {
-      TempQueuePerPartition q1 = orderedByNeed.remove();
-      underserved.add(q1);
-      TempQueuePerPartition q2 = orderedByNeed.peek();
-      // q1's pct of guaranteed won't be larger than q2's. If it's less, then
-      // return what has already been collected. Otherwise, q1's pct of
-      // guaranteed == that of q2, so add q2 to underserved list during the
-      // next pass.
-      if (q2 == null || tqComparator.compare(q1,q2) < 0) {
-        return underserved;
-      }
-    }
-    return underserved;
-  }
-
-
-  /**
-   * Given a set of queues compute the fix-point distribution of unassigned
-   * resources among them. As pending request of a queue are exhausted, the
-   * queue is removed from the set and remaining capacity redistributed among
-   * remaining queues. The distribution is weighted based on guaranteed
-   * capacity, unless asked to ignoreGuarantee, in which case resources are
-   * distributed uniformly.
-   */
-  private void computeFixpointAllocation(ResourceCalculator rc,
-      Resource tot_guarant, Collection<TempQueuePerPartition> qAlloc,
-      Resource unassigned, boolean ignoreGuarantee) {
-    // Prior to assigning the unused resources, process each queue as follows:
-    // If current > guaranteed, idealAssigned = guaranteed + untouchable extra
-    // Else idealAssigned = current;
-    // Subtract idealAssigned resources from unassigned.
-    // If the queue has all of its needs met (that is, if
-    // idealAssigned >= current + pending), remove it from consideration.
-    // Sort queues from most under-guaranteed to most over-guaranteed.
-    TQComparator tqComparator = new TQComparator(rc, tot_guarant);
-    PriorityQueue<TempQueuePerPartition> orderedByNeed = new PriorityQueue<>(10,
-        tqComparator);
-    for (Iterator<TempQueuePerPartition> i = qAlloc.iterator(); i.hasNext();) {
-      TempQueuePerPartition q = i.next();
-      Resource used = q.getUsed();
-
-      if (Resources.greaterThan(rc, tot_guarant, used,
-          q.getGuaranteed())) {
-        q.idealAssigned = Resources.add(
-            q.getGuaranteed(), q.untouchableExtra);
-      } else {
-        q.idealAssigned = Resources.clone(used);
-      }
-      Resources.subtractFrom(unassigned, q.idealAssigned);
-      // If idealAssigned < (allocated + used + pending), q needs more resources, so
-      // add it to the list of underserved queues, ordered by need.
-      Resource curPlusPend = Resources.add(q.getUsed(), q.pending);
-      if (Resources.lessThan(rc, tot_guarant, q.idealAssigned, curPlusPend)) {
-        orderedByNeed.add(q);
-      }
-    }
-
-    //assign all cluster resources until no more demand, or no resources are left
-    while (!orderedByNeed.isEmpty()
-        && Resources.greaterThan(rc,tot_guarant, unassigned,Resources.none())) {
-      Resource wQassigned = Resource.newInstance(0, 0);
-      // we compute normalizedGuarantees capacity based on currently active
-      // queues
-      resetCapacity(rc, unassigned, orderedByNeed, ignoreGuarantee);
-
-      // For each underserved queue (or set of queues if multiple are equally
-      // underserved), offer its share of the unassigned resources based on its
-      // normalized guarantee. After the offer, if the queue is not satisfied,
-      // place it back in the ordered list of queues, recalculating its place
-      // in the order of most under-guaranteed to most over-guaranteed. In this
-      // way, the most underserved queue(s) are always given resources first.
-      Collection<TempQueuePerPartition> underserved =
-          getMostUnderservedQueues(orderedByNeed, tqComparator);
-      for (Iterator<TempQueuePerPartition> i = underserved.iterator(); i
-          .hasNext();) {
-        TempQueuePerPartition sub = i.next();
-        Resource wQavail = Resources.multiplyAndNormalizeUp(rc,
-            unassigned, sub.normalizedGuarantee, Resource.newInstance(1, 1));
-        Resource wQidle = sub.offer(wQavail, rc, tot_guarant,
-            isReservedPreemptionCandidatesSelector);
-        Resource wQdone = Resources.subtract(wQavail, wQidle);
-
-        if (Resources.greaterThan(rc, tot_guarant,
-            wQdone, Resources.none())) {
-          // The queue is still asking for more. Put it back in the priority
-          // queue, recalculating its order based on need.
-          orderedByNeed.add(sub);
-        }
-        Resources.addTo(wQassigned, wQdone);
-      }
-      Resources.subtractFrom(unassigned, wQassigned);
-    }
+    super(preemptionContext, isReservedPreemptionCandidatesSelector);
   }
 
   /**
@@ -263,14 +95,14 @@ public class PreemptableResourceCalculator {
     }
 
     // first compute the allocation as a fixpoint based on guaranteed capacity
-    computeFixpointAllocation(rc, tot_guarant, nonZeroGuarQueues, unassigned,
+    computeFixpointAllocation(tot_guarant, nonZeroGuarQueues, unassigned,
         false);
 
     // if any capacity is left unassigned, distributed among zero-guarantee
     // queues uniformly (i.e., not based on guaranteed capacity, as this is zero)
     if (!zeroGuarQueues.isEmpty()
         && Resources.greaterThan(rc, tot_guarant, unassigned, Resources.none())) {
-      computeFixpointAllocation(rc, tot_guarant, zeroGuarQueues, unassigned,
+      computeFixpointAllocation(tot_guarant, zeroGuarQueues, unassigned,
           true);
     }
 
@@ -321,13 +153,12 @@ public class PreemptableResourceCalculator {
       computeIdealResourceDistribution(rc, root.getChildren(),
           totalPreemptionAllowed, root.idealAssigned);
       // compute recursively for lower levels and build list of leafs
-      for(TempQueuePerPartition t : root.getChildren()) {
+      for (TempQueuePerPartition t : root.getChildren()) {
         recursivelyComputeIdealAssignment(t, totalPreemptionAllowed);
       }
     }
   }
 
-
   private void calculateResToObtainByPartitionForLeafQueues(
       Set<String> leafQueueNames, Resource clusterResource) {
     // Loop all leaf queues

http://git-wip-us.apache.org/repos/asf/hadoop/blob/6a18ae84/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
index dd33d8f..c74e34e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/PreemptionCandidatesSelector.java
@@ -19,10 +19,16 @@
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
+import com.google.common.annotations.VisibleForTesting;
+
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.List;
 import java.util.Map;
 import java.util.Set;
 
@@ -41,7 +47,7 @@ public abstract class PreemptionCandidatesSelector {
    * selected candidates.
    *
    * @param selectedCandidates already selected candidates from previous policies
-   * @param clusterResource
+   * @param clusterResource total resource
    * @param totalPreemptedResourceAllowed how many resources allowed to be
    *                                      preempted in this round
    * @return merged selected candidates.
@@ -49,4 +55,28 @@ public abstract class PreemptionCandidatesSelector {
   public abstract Map<ApplicationAttemptId, Set<RMContainer>> selectCandidates(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed);
+
+  /**
+   * Compare by reversed priority order first, and then reversed containerId
+   * order.
+   *
+   * @param containers list of containers to sort for.
+   */
+  @VisibleForTesting
+  static void sortContainers(List<RMContainer> containers) {
+    Collections.sort(containers, new Comparator<RMContainer>() {
+      @Override
+      public int compare(RMContainer a, RMContainer b) {
+        Comparator<Priority> c = new org.apache.hadoop.yarn.server
+            .resourcemanager.resource.Priority.Comparator();
+        int priorityComp = c.compare(b.getContainer().getPriority(),
+            a.getContainer().getPriority());
+        if (priorityComp != 0) {
+           return priorityComp;
+        }
+        return b.getContainerId().compareTo(a.getContainerId());
+      }
+    });
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org