You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2017/07/12 14:00:30 UTC

[2/2] hadoop git commit: Revert "YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"

Revert "YARN-2113. Add cross-user preemption within CapacityScheduler's leaf-queue. (Contributed by Sunil G)"

This reverts commit eda4ac07c1835031aca7e27cc673f1c5913813bb.
Commit eda4ac07c1835031aca7e27cc673f1c5913813bb was a separate patch from trunk rather than a cherry-pick. I will cherryp-ick dependencies and then cherry-pick the trunk commit for YARN-2113.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e6cdf770
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e6cdf770
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e6cdf770

Branch: refs/heads/branch-2
Commit: e6cdf770ca863827a1c3a304a1abbfbf37d81462
Parents: 3419381
Author: Eric Payne <ep...@apache.org>
Authored: Wed Jul 12 08:56:40 2017 -0500
Committer: Eric Payne <ep...@apache.org>
Committed: Wed Jul 12 08:56:40 2017 -0500

----------------------------------------------------------------------
 .../resource/DefaultResourceCalculator.java     |   5 -
 .../resource/DominantResourceCalculator.java    |   5 -
 .../yarn/util/resource/ResourceCalculator.java  |   9 -
 .../hadoop/yarn/util/resource/Resources.java    |   5 -
 .../CapacitySchedulerPreemptionContext.java     |   5 -
 .../CapacitySchedulerPreemptionUtils.java       |   9 +-
 .../FifoIntraQueuePreemptionPlugin.java         | 329 ++-----
 .../capacity/IntraQueueCandidatesSelector.java  | 112 +--
 .../IntraQueuePreemptionComputePlugin.java      |  10 +-
 .../ProportionalCapacityPreemptionPolicy.java   |  25 +-
 .../monitor/capacity/TempAppPerPartition.java   |  24 +-
 .../monitor/capacity/TempQueuePerPartition.java |  14 -
 .../monitor/capacity/TempUserPerPartition.java  |  88 --
 .../CapacitySchedulerConfiguration.java         |   8 -
 .../scheduler/capacity/LeafQueue.java           |  67 +-
 ...alCapacityPreemptionPolicyMockFramework.java |  89 +-
 ...ionalCapacityPreemptionPolicyIntraQueue.java |  30 +-
 ...cityPreemptionPolicyIntraQueueUserLimit.java | 899 -------------------
 ...pacityPreemptionPolicyIntraQueueWithDRF.java | 178 ----
 19 files changed, 173 insertions(+), 1738 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 524a049..ef7229c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -121,9 +121,4 @@ public class DefaultResourceCalculator extends ResourceCalculator {
       Resource smaller, Resource bigger) {
     return smaller.getMemorySize() <= bigger.getMemorySize();
   }
-
-  @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    return resource.getMemorySize() == 0f;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 9155ae3..1457c28 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -231,9 +231,4 @@ public class DominantResourceCalculator extends ResourceCalculator {
     return smaller.getMemorySize() <= bigger.getMemorySize()
         && smaller.getVirtualCores() <= bigger.getVirtualCores();
   }
-
-  @Override
-  public boolean isAnyMajorResourceZero(Resource resource) {
-    return resource.getMemorySize() == 0f || resource.getVirtualCores() == 0;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index d219fe1..a2f85b3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -204,13 +204,4 @@ public abstract class ResourceCalculator {
    */
   public abstract boolean fitsIn(Resource cluster,
       Resource smaller, Resource bigger);
-
-  /**
-   * Check if resource has any major resource types (which are all NodeManagers
-   * included) a zero value.
-   *
-   * @param resource resource
-   * @return returns true if any resource is zero.
-   */
-  public abstract boolean isAnyMajorResourceZero(Resource resource);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index 91a5297..fc46fa2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -345,9 +345,4 @@ public class Resources {
     return createResource(Math.max(lhs.getMemorySize(), rhs.getMemorySize()),
         Math.max(lhs.getVirtualCores(), rhs.getVirtualCores()));
   }
-
-  public static boolean isAnyMajorResourceZero(ResourceCalculator rc,
-      Resource resource) {
-    return rc.isAnyMajorResourceZero(resource);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
index d6f3f6c..982b1f1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionContext.java
@@ -18,11 +18,9 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 
@@ -65,7 +63,4 @@ interface CapacitySchedulerPreemptionContext {
   float getMinimumThresholdForIntraQueuePreemption();
 
   float getMaxAllowableLimitForIntraQueuePreemption();
-
-  @Unstable
-  IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
index 0ae3ef0..abad2a1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/CapacitySchedulerPreemptionUtils.java
@@ -99,7 +99,7 @@ public class CapacitySchedulerPreemptionUtils {
           }
 
           deductPreemptableResourcePerApp(context, tq.totalPartitionResource,
-              tas, res);
+              tas, res, partition);
         }
       }
     }
@@ -108,10 +108,10 @@ public class CapacitySchedulerPreemptionUtils {
   private static void deductPreemptableResourcePerApp(
       CapacitySchedulerPreemptionContext context,
       Resource totalPartitionResource, Collection<TempAppPerPartition> tas,
-      Resource res) {
+      Resource res, String partition) {
     for (TempAppPerPartition ta : tas) {
       ta.deductActuallyToBePreempted(context.getResourceCalculator(),
-          totalPartitionResource, res);
+          totalPartitionResource, res, partition);
     }
   }
 
@@ -157,8 +157,7 @@ public class CapacitySchedulerPreemptionUtils {
         && Resources.greaterThan(rc, clusterResource, toObtainByPartition,
             Resources.none())
         && Resources.fitsIn(rc, clusterResource,
-            rmContainer.getAllocatedResource(), totalPreemptionAllowed)
-        && !Resources.isAnyMajorResourceZero(rc, toObtainByPartition)) {
+            rmContainer.getAllocatedResource(), totalPreemptionAllowed)) {
       Resources.subtractFrom(toObtainByPartition,
           rmContainer.getAllocatedResource());
       Resources.subtractFrom(totalPreemptionAllowed,

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 4bf6760..757f567 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -18,13 +18,11 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.List;
 import java.util.Map;
 import java.util.PriorityQueue;
 import java.util.Set;
@@ -35,11 +33,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -65,26 +60,6 @@ public class FifoIntraQueuePreemptionPlugin
   }
 
   @Override
-  public Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
-      String partition) {
-    TempQueuePerPartition tq = context.getQueueByPartition(queueName,
-        partition);
-
-    List<FiCaSchedulerApp> apps = new ArrayList<FiCaSchedulerApp>();
-    for (TempAppPerPartition tmpApp : tq.getApps()) {
-      // If a lower priority app was not selected to get preempted, mark such
-      // apps out from preemption candidate selection.
-      if (Resources.equals(tmpApp.getActuallyToBePreempted(),
-          Resources.none())) {
-        continue;
-      }
-
-      apps.add(tmpApp.app);
-    }
-    return apps;
-  }
-
-  @Override
   public Map<String, Resource> getResourceDemandFromAppsPerQueue(
       String queueName, String partition) {
 
@@ -114,7 +89,7 @@ public class FifoIntraQueuePreemptionPlugin
 
   @Override
   public void computeAppsIdealAllocation(Resource clusterResource,
-      TempQueuePerPartition tq,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource totalPreemptedResourceAllowed,
       Resource queueReassignableResource, float maxAllowablePreemptLimit) {
@@ -137,15 +112,17 @@ public class FifoIntraQueuePreemptionPlugin
 
     // 3. Create all tempApps for internal calculation and return a list from
     // high priority to low priority order.
-    PriorityQueue<TempAppPerPartition> orderedByPriority = createTempAppForResCalculation(
-        tq, apps, clusterResource, perUserAMUsed);
+    TAPriorityComparator taComparator = new TAPriorityComparator();
+    PriorityQueue<TempAppPerPartition> orderedByPriority =
+        createTempAppForResCalculation(tq.partition, apps, taComparator);
 
     // 4. Calculate idealAssigned per app by checking based on queue's
     // unallocated resource.Also return apps arranged from lower priority to
     // higher priority.
-    TreeSet<TempAppPerPartition> orderedApps = calculateIdealAssignedResourcePerApp(
-        clusterResource, tq, selectedCandidates, queueReassignableResource,
-        orderedByPriority);
+    TreeSet<TempAppPerPartition> orderedApps =
+        calculateIdealAssignedResourcePerApp(clusterResource,
+            partitionBasedResource, tq, selectedCandidates,
+            queueReassignableResource, orderedByPriority, perUserAMUsed);
 
     // 5. A configurable limit that could define an ideal allowable preemption
     // limit. Based on current queue's capacity,defined how much % could become
@@ -168,7 +145,7 @@ public class FifoIntraQueuePreemptionPlugin
     // 7. From lowest priority app onwards, calculate toBePreempted resource
     // based on demand.
     calculateToBePreemptedResourcePerApp(clusterResource, orderedApps,
-        Resources.clone(preemptionLimit));
+        preemptionLimit);
 
     // Save all apps (low to high) to temp queue for further reference
     tq.addAllApps(orderedApps);
@@ -176,8 +153,7 @@ public class FifoIntraQueuePreemptionPlugin
     // 8. There are chances that we may preempt for the demand from same
     // priority level, such cases are to be validated out.
     validateOutSameAppPriorityFromDemand(clusterResource,
-        (TreeSet<TempAppPerPartition>) orderedApps, tq.getUsersPerPartition(),
-        context.getIntraQueuePreemptionOrderPolicy());
+        (TreeSet<TempAppPerPartition>) tq.getApps());
 
     if (LOG.isDebugEnabled()) {
       LOG.debug("Queue Name:" + tq.queueName + ", partition:" + tq.partition);
@@ -200,17 +176,17 @@ public class FifoIntraQueuePreemptionPlugin
 
       Resource preemtableFromApp = Resources.subtract(tmpApp.getUsed(),
           tmpApp.idealAssigned);
-      Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.selected);
-      Resources.subtractFromNonNegative(preemtableFromApp, tmpApp.getAMUsed());
+      Resources.subtractFrom(preemtableFromApp, tmpApp.selected);
+      Resources.subtractFrom(preemtableFromApp, tmpApp.getAMUsed());
 
       // Calculate toBePreempted from apps as follows:
       // app.preemptable = min(max(app.used - app.selected - app.ideal, 0),
       // intra_q_preemptable)
       tmpApp.toBePreempted = Resources.min(rc, clusterResource, Resources
           .max(rc, clusterResource, preemtableFromApp, Resources.none()),
-          Resources.clone(preemptionLimit));
+          preemptionLimit);
 
-      preemptionLimit = Resources.subtractFromNonNegative(preemptionLimit,
+      preemptionLimit = Resources.subtract(preemptionLimit,
           tmpApp.toBePreempted);
     }
   }
@@ -245,24 +221,31 @@ public class FifoIntraQueuePreemptionPlugin
    * }
    *  
    * @param clusterResource Cluster Resource
+   * @param partitionBasedResource resource per partition
    * @param tq TempQueue
    * @param selectedCandidates Already Selected preemption candidates
    * @param queueReassignableResource Resource used in a queue
    * @param orderedByPriority List of running apps
+   * @param perUserAMUsed AM used resource
    * @return List of temp apps ordered from low to high priority
    */
   private TreeSet<TempAppPerPartition> calculateIdealAssignedResourcePerApp(
-      Resource clusterResource, TempQueuePerPartition tq,
+      Resource clusterResource, Resource partitionBasedResource,
+      TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource queueReassignableResource,
-      PriorityQueue<TempAppPerPartition> orderedByPriority) {
+      PriorityQueue<TempAppPerPartition> orderedByPriority,
+      Map<String, Resource> perUserAMUsed) {
 
     Comparator<TempAppPerPartition> reverseComp = Collections
         .reverseOrder(new TAPriorityComparator());
     TreeSet<TempAppPerPartition> orderedApps = new TreeSet<>(reverseComp);
 
+    Map<String, Resource> userIdealAssignedMapping = new HashMap<>();
     String partition = tq.partition;
-    Map<String, TempUserPerPartition> usersPerPartition = tq.getUsersPerPartition();
+
+    Map<String, Resource> preCalculatedUserLimit =
+        new HashMap<String, Resource>();
 
     while (!orderedByPriority.isEmpty()) {
       // Remove app from the next highest remaining priority and process it to
@@ -272,19 +255,43 @@ public class FifoIntraQueuePreemptionPlugin
 
       // Once unallocated resource is 0, we can stop assigning ideal per app.
       if (Resources.lessThanOrEqual(rc, clusterResource,
-          queueReassignableResource, Resources.none())
-          || Resources.isAnyMajorResourceZero(rc, queueReassignableResource)) {
+          queueReassignableResource, Resources.none())) {
         continue;
       }
 
       String userName = tmpApp.app.getUser();
-      TempUserPerPartition tmpUser = usersPerPartition.get(userName);
-      Resource userLimitResource = tmpUser.getUserLimit();
-      Resource idealAssignedForUser = tmpUser.idealAssigned;
+      Resource userLimitResource = preCalculatedUserLimit.get(userName);
+
+      // Verify whether we already calculated headroom for this user.
+      if (userLimitResource == null) {
+        userLimitResource = Resources.clone(tq.leafQueue
+            .getUserLimitPerUser(userName, partitionBasedResource, partition));
+
+        Resource amUsed = perUserAMUsed.get(userName);
+        if (null == amUsed) {
+          amUsed = Resources.createResource(0, 0);
+        }
+
+        // Real AM used need not have to be considered for user-limit as well.
+        userLimitResource = Resources.subtract(userLimitResource, amUsed);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Userlimit for user '" + userName + "' is :"
+              + userLimitResource + ", and amUsed is:" + amUsed);
+        }
+
+        preCalculatedUserLimit.put(userName, userLimitResource);
+      }
+
+      Resource idealAssignedForUser = userIdealAssignedMapping.get(userName);
+
+      if (idealAssignedForUser == null) {
+        idealAssignedForUser = Resources.createResource(0, 0);
+        userIdealAssignedMapping.put(userName, idealAssignedForUser);
+      }
 
       // Calculate total selected container resources from current app.
-      getAlreadySelectedPreemptionCandidatesResource(selectedCandidates, tmpApp,
-          tmpUser, partition);
+      getAlreadySelectedPreemptionCandidatesResource(selectedCandidates,
+          tmpApp, partition);
 
       // For any app, used+pending will give its idealAssigned. However it will
       // be tightly linked to queue's unallocated quota. So lower priority apps
@@ -295,11 +302,10 @@ public class FifoIntraQueuePreemptionPlugin
 
       if (Resources.lessThan(rc, clusterResource, idealAssignedForUser,
           userLimitResource)) {
-        Resource idealAssigned = Resources.min(rc, clusterResource,
-            appIdealAssigned,
+        appIdealAssigned = Resources.min(rc, clusterResource, appIdealAssigned,
             Resources.subtract(userLimitResource, idealAssignedForUser));
         tmpApp.idealAssigned = Resources.clone(Resources.min(rc,
-            clusterResource, queueReassignableResource, idealAssigned));
+            clusterResource, queueReassignableResource, appIdealAssigned));
         Resources.addTo(idealAssignedForUser, tmpApp.idealAssigned);
       } else {
         continue;
@@ -314,8 +320,7 @@ public class FifoIntraQueuePreemptionPlugin
             Resources.subtract(tmpApp.idealAssigned, appUsedExcludedSelected));
       }
 
-      Resources.subtractFromNonNegative(queueReassignableResource,
-          tmpApp.idealAssigned);
+      Resources.subtractFrom(queueReassignableResource, tmpApp.idealAssigned);
     }
 
     return orderedApps;
@@ -327,8 +332,7 @@ public class FifoIntraQueuePreemptionPlugin
    */
   private void getAlreadySelectedPreemptionCandidatesResource(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
-      TempAppPerPartition tmpApp, TempUserPerPartition tmpUser,
-      String partition) {
+      TempAppPerPartition tmpApp, String partition) {
     tmpApp.selected = Resources.createResource(0, 0);
     Set<RMContainer> containers = selectedCandidates
         .get(tmpApp.app.getApplicationAttemptId());
@@ -340,23 +344,16 @@ public class FifoIntraQueuePreemptionPlugin
     for (RMContainer cont : containers) {
       if (partition.equals(cont.getNodeLabelExpression())) {
         Resources.addTo(tmpApp.selected, cont.getAllocatedResource());
-        Resources.addTo(tmpUser.selected, cont.getAllocatedResource());
       }
     }
   }
 
   private PriorityQueue<TempAppPerPartition> createTempAppForResCalculation(
-      TempQueuePerPartition tq, Collection<FiCaSchedulerApp> apps,
-      Resource clusterResource,
-      Map<String, Resource> perUserAMUsed) {
-    TAPriorityComparator taComparator = new TAPriorityComparator();
+      String partition, Collection<FiCaSchedulerApp> apps,
+      TAPriorityComparator taComparator) {
     PriorityQueue<TempAppPerPartition> orderedByPriority = new PriorityQueue<>(
         100, taComparator);
 
-    String partition = tq.partition;
-    Map<String, TempUserPerPartition> usersPerPartition = tq
-        .getUsersPerPartition();
-
     // have an internal temp app structure to store intermediate data(priority)
     for (FiCaSchedulerApp app : apps) {
 
@@ -388,156 +385,56 @@ public class FifoIntraQueuePreemptionPlugin
       tmpApp.idealAssigned = Resources.createResource(0, 0);
 
       orderedByPriority.add(tmpApp);
-
-      // Create a TempUserPerPartition structure to hold more information
-      // regarding each user's entities such as UserLimit etc. This could
-      // be kept in a user to TempUserPerPartition map for further reference.
-      String userName = app.getUser();
-      if (!usersPerPartition.containsKey(userName)) {
-        ResourceUsage userResourceUsage = tq.leafQueue.getUser(userName)
-            .getResourceUsage();
-
-        TempUserPerPartition tmpUser = new TempUserPerPartition(
-            tq.leafQueue.getUser(userName), tq.queueName,
-            Resources.clone(userResourceUsage.getUsed(partition)),
-            Resources.clone(perUserAMUsed.get(userName)),
-            Resources.clone(userResourceUsage.getReserved(partition)),
-            Resources.none());
-
-        Resource userLimitResource = Resources.clone(
-            tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
-                partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
-
-        // Real AM used need not have to be considered for user-limit as well.
-        userLimitResource = Resources.subtract(userLimitResource,
-            tmpUser.amUsed);
-        tmpUser.setUserLimit(userLimitResource);
-
-        if (LOG.isDebugEnabled()) {
-          LOG.debug("TempUser:" + tmpUser);
-        }
-
-        tmpUser.idealAssigned = Resources.createResource(0, 0);
-        tq.addUserPerPartition(userName, tmpUser);
-      }
     }
     return orderedByPriority;
   }
 
   /*
    * Fifo+Priority based preemption policy need not have to preempt resources at
-   * same priority level. Such cases will be validated out. But if the demand is
-   * from an app of different user, force to preempt resources even if apps are
-   * at same priority.
+   * same priority level. Such cases will be validated out.
    */
   public void validateOutSameAppPriorityFromDemand(Resource cluster,
-      TreeSet<TempAppPerPartition> orderedApps,
-      Map<String, TempUserPerPartition> usersPerPartition,
-      IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrder) {
+      TreeSet<TempAppPerPartition> appsOrderedfromLowerPriority) {
 
-    TempAppPerPartition[] apps = orderedApps
-        .toArray(new TempAppPerPartition[orderedApps.size()]);
+    TempAppPerPartition[] apps = appsOrderedfromLowerPriority
+        .toArray(new TempAppPerPartition[appsOrderedfromLowerPriority.size()]);
     if (apps.length <= 0) {
       return;
     }
 
-    for (int hPriority = apps.length - 1; hPriority >= 0; hPriority--) {
-
-      // Check whether high priority app with demand needs resource from other
-      // user.
-      if (Resources.greaterThan(rc, cluster,
-          apps[hPriority].getToBePreemptFromOther(), Resources.none())) {
-
-        // Given we have a demand from a high priority app, we can do a reverse
-        // scan from lower priority apps to select resources.
-        // Since idealAssigned of each app has considered user-limit, this logic
-        // will provide eventual consistency w.r.t user-limit as well.
-        for (int lPriority = 0; lPriority < apps.length; lPriority++) {
+    int lPriority = 0;
+    int hPriority = apps.length - 1;
+
+    while (lPriority < hPriority
+        && !apps[lPriority].equals(apps[hPriority])
+        && apps[lPriority].getPriority() < apps[hPriority].getPriority()) {
+      Resource toPreemptFromOther = apps[hPriority]
+          .getToBePreemptFromOther();
+      Resource actuallyToPreempt = apps[lPriority].getActuallyToBePreempted();
+      Resource delta = Resources.subtract(apps[lPriority].toBePreempted,
+          actuallyToPreempt);
+
+      if (Resources.greaterThan(rc, cluster, delta, Resources.none())) {
+        Resource toPreempt = Resources.min(rc, cluster,
+            toPreemptFromOther, delta);
+
+        apps[hPriority].setToBePreemptFromOther(
+            Resources.subtract(toPreemptFromOther, toPreempt));
+        apps[lPriority].setActuallyToBePreempted(
+            Resources.add(actuallyToPreempt, toPreempt));
+      }
 
-          // Check whether app with demand needs resource from other user.
-          if (Resources.greaterThan(rc, cluster, apps[lPriority].toBePreempted,
-              Resources.none())) {
+      if (Resources.lessThanOrEqual(rc, cluster,
+          apps[lPriority].toBePreempted,
+          apps[lPriority].getActuallyToBePreempted())) {
+        lPriority++;
+        continue;
+      }
 
-            // If apps are of same user, and priority is same, then skip.
-            if ((apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (apps[lPriority].getPriority() >= apps[hPriority]
-                    .getPriority())) {
-              continue;
-            }
-
-            if (Resources.lessThanOrEqual(rc, cluster,
-                apps[lPriority].toBePreempted,
-                apps[lPriority].getActuallyToBePreempted())
-                || Resources.equals(apps[hPriority].getToBePreemptFromOther(),
-                    Resources.none())) {
-              continue;
-            }
-
-            // Ideally if any application has a higher priority, then it can
-            // force to preempt any lower priority app from any user. However
-            // if admin enforces user-limit over priority, preemption module
-            // will not choose lower priority apps from usre's who are not yet
-            // met its user-limit.
-            TempUserPerPartition tmpUser = usersPerPartition
-                .get(apps[lPriority].getUser());
-            if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (!tmpUser.isUserLimitReached(rc, cluster))
-                && (intraQueuePreemptionOrder
-                    .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST))) {
-              continue;
-            }
-
-            Resource toPreemptFromOther = apps[hPriority]
-                .getToBePreemptFromOther();
-            Resource actuallyToPreempt = apps[lPriority]
-                .getActuallyToBePreempted();
-
-            // A lower priority app could offer more resource to preempt, if
-            // multiple higher priority/under served users needs resources.
-            // After one iteration, we need to ensure that actuallyToPreempt is
-            // subtracted from the resource to preempt.
-            Resource preemptableFromLowerPriorityApp = Resources
-                .subtract(apps[lPriority].toBePreempted, actuallyToPreempt);
-
-            // In case of user-limit preemption, when app's are from different
-            // user and of same priority, we will do user-limit preemption if
-            // there is a demand from under UL quota app.
-            // However this under UL quota app's demand may be more.
-            // Still we should ensure that we are not doing over preemption such
-            // that only a maximum of (user's used - UL quota) could be
-            // preempted.
-            if ((!apps[hPriority].getUser().equals(apps[lPriority].getUser()))
-                && (apps[lPriority].getPriority() == apps[hPriority]
-                    .getPriority())
-                && tmpUser.isUserLimitReached(rc, cluster)) {
-
-              Resource deltaULQuota = Resources
-                  .subtract(tmpUser.getUsedDeductAM(), tmpUser.selected);
-              Resources.subtractFrom(deltaULQuota, tmpUser.getUserLimit());
-
-              if (tmpUser.isPreemptionQuotaForULDeltaDone()) {
-                deltaULQuota = Resources.createResource(0, 0);
-              }
-
-              if (Resources.lessThan(rc, cluster, deltaULQuota,
-                  preemptableFromLowerPriorityApp)) {
-                tmpUser.updatePreemptionQuotaForULDeltaAsDone(true);
-                preemptableFromLowerPriorityApp = deltaULQuota;
-              }
-            }
-
-            if (Resources.greaterThan(rc, cluster,
-                preemptableFromLowerPriorityApp, Resources.none())) {
-              Resource toPreempt = Resources.min(rc, cluster,
-                  toPreemptFromOther, preemptableFromLowerPriorityApp);
-
-              apps[hPriority].setToBePreemptFromOther(
-                  Resources.subtract(toPreemptFromOther, toPreempt));
-              apps[lPriority].setActuallyToBePreempted(
-                  Resources.add(actuallyToPreempt, toPreempt));
-            }
-          }
-        }
+      if (Resources.equals(apps[hPriority].getToBePreemptFromOther(),
+          Resources.none())) {
+        hPriority--;
+        continue;
       }
     }
   }
@@ -557,40 +454,6 @@ public class FifoIntraQueuePreemptionPlugin
       Resources.addTo(userAMResource, app.getAMResource(partition));
       Resources.addTo(amUsed, app.getAMResource(partition));
     }
-
     return amUsed;
   }
-
-  @Override
-  public boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
-      Resource clusterResource, Resource usedResource, RMContainer c) {
-    // Ensure below checks
-    // 1. This check must be done only when preemption order is USERLIMIT_FIRST
-    // 2. By selecting container "c", check whether this user's resource usage
-    // is going below its user-limit.
-    // 3. Used resource of user must be always greater than user-limit to
-    // skip some containers as per this check. If used resource is under user
-    // limit, then these containers of this user has to be preempted as demand
-    // might be due to high priority apps running in same user.
-    String partition = context.getScheduler()
-        .getSchedulerNode(c.getAllocatedNode()).getPartition();
-    TempQueuePerPartition tq = context.getQueueByPartition(app.getQueueName(),
-        partition);
-    TempUserPerPartition tmpUser = tq.getUsersPerPartition().get(app.getUser());
-
-    // Given user is not present, skip the check.
-    if (tmpUser == null) {
-      return false;
-    }
-
-    // For ideal resource computations, user-limit got saved by subtracting am
-    // used resource in TempUser. Hence it has to be added back here for
-    // complete check.
-    Resource userLimit = Resources.add(tmpUser.getUserLimit(), tmpUser.amUsed);
-
-    return Resources.lessThanOrEqual(rc, clusterResource,
-        Resources.subtract(usedResource, c.getAllocatedResource()), userLimit)
-        && context.getIntraQueuePreemptionOrderPolicy()
-            .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index e2f311f..2890414 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
 import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -32,13 +31,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Comparator;
-import java.util.HashMap;
+import java.util.Iterator;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Map.Entry;
 import java.util.Set;
 
 /**
@@ -54,14 +51,14 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         Comparator<TempAppPerPartition> {
 
     @Override
-    public int compare(TempAppPerPartition ta1, TempAppPerPartition ta2) {
-      Priority p1 = Priority.newInstance(ta1.getPriority());
-      Priority p2 = Priority.newInstance(ta2.getPriority());
+    public int compare(TempAppPerPartition tq1, TempAppPerPartition tq2) {
+      Priority p1 = Priority.newInstance(tq1.getPriority());
+      Priority p2 = Priority.newInstance(tq2.getPriority());
 
       if (!p1.equals(p2)) {
         return p1.compareTo(p2);
       }
-      return ta1.getApplicationId().compareTo(ta2.getApplicationId());
+      return tq1.getApplicationId().compareTo(tq2.getApplicationId());
     }
   }
 
@@ -124,60 +121,37 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         Map<String, Resource> resToObtainByPartition = fifoPreemptionComputePlugin
             .getResourceDemandFromAppsPerQueue(queueName, partition);
 
-        // Default preemption iterator considers only FIFO+priority. For
-        // userlimit preemption, its possible that some lower priority apps
-        // needs from high priority app of another user. Hence use apps
-        // ordered by userlimit starvation as well.
-        Collection<FiCaSchedulerApp> apps = fifoPreemptionComputePlugin
-            .getPreemptableApps(queueName, partition);
-
-        // 6. Get user-limit to ensure that we do not preempt resources which
-        // will force user's resource to come under its UL.
-        Map<String, Resource> rollingResourceUsagePerUser = new HashMap<>();
-        initializeUsageAndUserLimitForCompute(clusterResource, partition,
-            leafQueue, rollingResourceUsagePerUser);
-
-        // 7. Based on the selected resource demand per partition, select
+        // 6. Based on the selected resource demand per partition, select
         // containers with known policy from inter-queue preemption.
         try {
           leafQueue.getReadLock().lock();
-          for (FiCaSchedulerApp app : apps) {
-            preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
-                clusterResource, totalPreemptedResourceAllowed,
-                resToObtainByPartition, rollingResourceUsagePerUser);
+          Iterator<FiCaSchedulerApp> desc = leafQueue.getOrderingPolicy()
+              .getPreemptionIterator();
+          while (desc.hasNext()) {
+            FiCaSchedulerApp app = desc.next();
+            preemptFromLeastStarvedApp(selectedCandidates, clusterResource,
+                totalPreemptedResourceAllowed, resToObtainByPartition,
+                leafQueue, app);
           }
         } finally {
           leafQueue.getReadLock().unlock();
         }
       }
     }
-    return selectedCandidates;
-  }
 
-  private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
-      String partition, LeafQueue leafQueue,
-      Map<String, Resource> rollingResourceUsagePerUser) {
-    for (String user : leafQueue.getAllUsers()) {
-      // Initialize used resource of a given user for rolling computation.
-      rollingResourceUsagePerUser.put(user, Resources.clone(
-          leafQueue.getUser(user).getResourceUsage().getUsed(partition)));
-      if (LOG.isDebugEnabled()) {
-        LOG.debug("Rolling resource usage for user:" + user + " is : "
-            + rollingResourceUsagePerUser.get(user));
-      }
-    }
+    return selectedCandidates;
   }
 
-  private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
-      FiCaSchedulerApp app,
+  private void preemptFromLeastStarvedApp(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource clusterResource, Resource totalPreemptedResourceAllowed,
-      Map<String, Resource> resToObtainByPartition,
-      Map<String, Resource> rollingResourceUsagePerUser) {
+      Map<String, Resource> resToObtainByPartition, LeafQueue leafQueue,
+      FiCaSchedulerApp app) {
 
     // ToDo: Reuse reservation selector here.
 
-    List<RMContainer> liveContainers = new ArrayList<>(app.getLiveContainers());
+    List<RMContainer> liveContainers = new ArrayList<>(
+        app.getLiveContainers());
     sortContainers(liveContainers);
 
     if (LOG.isDebugEnabled()) {
@@ -186,8 +160,6 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
               + totalPreemptedResourceAllowed);
     }
 
-    Resource rollingUsedResourcePerUser = rollingResourceUsagePerUser
-        .get(app.getUser());
     for (RMContainer c : liveContainers) {
 
       // if there are no demand, return.
@@ -212,34 +184,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         continue;
       }
 
-      // If selected container brings down resource usage under its user's
-      // UserLimit (or equals to), we must skip such containers.
-      if (fifoPreemptionComputePlugin.skipContainerBasedOnIntraQueuePolicy(app,
-          clusterResource, rollingUsedResourcePerUser, c)) {
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(
-              "Skipping container: " + c.getContainerId() + " with resource:"
-                  + c.getAllocatedResource() + " as UserLimit for user:"
-                  + app.getUser() + " with resource usage: "
-                  + rollingUsedResourcePerUser + " is going under UL");
-        }
-        break;
-      }
-
       // Try to preempt this container
-      boolean ret = CapacitySchedulerPreemptionUtils
-          .tryPreemptContainerAndDeductResToObtain(rc, preemptionContext,
-              resToObtainByPartition, c, clusterResource, selectedCandidates,
-              totalPreemptedResourceAllowed);
-
-      // Subtract from respective user's resource usage once a container is
-      // selected for preemption.
-      if (ret && preemptionContext.getIntraQueuePreemptionOrderPolicy()
-          .equals(IntraQueuePreemptionOrderPolicy.USERLIMIT_FIRST)) {
-        Resources.subtractFrom(rollingUsedResourcePerUser,
-            c.getAllocatedResource());
-      }
+      CapacitySchedulerPreemptionUtils.tryPreemptContainerAndDeductResToObtain(
+          rc, preemptionContext, resToObtainByPartition, c, clusterResource,
+          selectedCandidates, totalPreemptedResourceAllowed);
     }
+
   }
 
   private void computeIntraQueuePreemptionDemand(Resource clusterResource,
@@ -255,7 +205,12 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
         continue;
       }
 
-      // 2. loop through all queues corresponding to a partition.
+      // 2. Its better to get partition based resource limit earlier before
+      // starting calculation
+      Resource partitionBasedResource =
+          context.getPartitionResource(partition);
+
+      // 3. loop through all queues corresponding to a partition.
       for (String queueName : queueNames) {
         TempQueuePerPartition tq = context.getQueueByPartition(queueName,
             partition);
@@ -266,22 +221,23 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
           continue;
         }
 
-        // 3. Consider reassignableResource as (used - actuallyToBePreempted).
+        // 4. Consider reassignableResource as (used - actuallyToBePreempted).
         // This provides as upper limit to split apps quota in a queue.
         Resource queueReassignableResource = Resources.subtract(tq.getUsed(),
             tq.getActuallyToBePreempted());
 
-        // 4. Check queue's used capacity. Make sure that the used capacity is
+        // 5. Check queue's used capacity. Make sure that the used capacity is
         // above certain limit to consider for intra queue preemption.
         if (leafQueue.getQueueCapacities().getUsedCapacity(partition) < context
             .getMinimumThresholdForIntraQueuePreemption()) {
           continue;
         }
 
-        // 5. compute the allocation of all apps based on queue's unallocated
+        // 6. compute the allocation of all apps based on queue's unallocated
         // capacity
         fifoPreemptionComputePlugin.computeAppsIdealAllocation(clusterResource,
-            tq, selectedCandidates, totalPreemptedResourceAllowed,
+            partitionBasedResource, tq, selectedCandidates,
+            totalPreemptedResourceAllowed,
             queueReassignableResource,
             context.getMaxAllowableLimitForIntraQueuePreemption());
       }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
index 56fd007..93ebe65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueuePreemptionComputePlugin.java
@@ -18,14 +18,12 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
 
-import java.util.Collection;
 import java.util.Map;
 import java.util.Set;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 
 
 interface IntraQueuePreemptionComputePlugin {
@@ -34,14 +32,8 @@ interface IntraQueuePreemptionComputePlugin {
       String partition);
 
   void computeAppsIdealAllocation(Resource clusterResource,
-      TempQueuePerPartition tq,
+      Resource partitionBasedResource, TempQueuePerPartition tq,
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       Resource totalPreemptedResourceAllowed, Resource queueTotalUnassigned,
       float maxAllowablePreemptLimit);
-
-  Collection<FiCaSchedulerApp> getPreemptableApps(String queueName,
-      String partition);
-
-  boolean skipContainerBasedOnIntraQueuePolicy(FiCaSchedulerApp app,
-      Resource clusterResource, Resource usedResource, RMContainer c);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index b171b04..1e684ea 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -22,7 +22,6 @@ import com.google.common.collect.ImmutableSet;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
@@ -81,16 +80,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
  */
 public class ProportionalCapacityPreemptionPolicy
     implements SchedulingEditPolicy, CapacitySchedulerPreemptionContext {
-
-  /**
-   * IntraQueuePreemptionOrder will be used to define various priority orders
-   * which could be configured by admin.
-   */
-  @Unstable
-  public enum IntraQueuePreemptionOrderPolicy {
-    PRIORITY_FIRST, USERLIMIT_FIRST;
-  }
-
   private static final Log LOG =
     LogFactory.getLog(ProportionalCapacityPreemptionPolicy.class);
 
@@ -107,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicy
 
   private float maxAllowableLimitForIntraQueuePreemption;
   private float minimumThresholdForIntraQueuePreemption;
-  private IntraQueuePreemptionOrderPolicy intraQueuePreemptionOrderPolicy;
 
   // Pointer to other RM components
   private RMContext rmContext;
@@ -203,13 +191,6 @@ public class ProportionalCapacityPreemptionPolicy
         CapacitySchedulerConfiguration.
         DEFAULT_INTRAQUEUE_PREEMPTION_MINIMUM_THRESHOLD);
 
-    intraQueuePreemptionOrderPolicy = IntraQueuePreemptionOrderPolicy
-        .valueOf(csConfig
-            .get(
-                CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-                CapacitySchedulerConfiguration.DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY)
-            .toUpperCase());
-
     rc = scheduler.getResourceCalculator();
     nlm = scheduler.getRMContext().getNodeLabelManager();
 
@@ -262,6 +243,7 @@ public class ProportionalCapacityPreemptionPolicy
     }
   }
 
+  @SuppressWarnings("unchecked")
   private void preemptOrkillSelectedContainerAfterWait(
       Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
       long currentTime) {
@@ -674,9 +656,4 @@ public class ProportionalCapacityPreemptionPolicy
     }
     underServedQueues.add(queueName);
   }
-
-  @Override
-  public IntraQueuePreemptionOrderPolicy getIntraQueuePreemptionOrderPolicy() {
-    return intraQueuePreemptionOrderPolicy;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
index cbc1028..fccd2a7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempAppPerPartition.java
@@ -59,17 +59,13 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
   @Override
   public String toString() {
     StringBuilder sb = new StringBuilder();
-    sb.append("NAME: " + getApplicationId())
-        .append("  PRIO: ").append(priority)
-        .append("  CUR: ").append(getUsed())
-        .append("  PEN: ").append(pending)
-        .append("  RESERVED: ").append(reserved)
-        .append("  IDEAL_ASSIGNED: ").append(idealAssigned)
-        .append("  PREEMPT_OTHER: ").append(getToBePreemptFromOther())
-        .append("  IDEAL_PREEMPT: ").append(toBePreempted)
-        .append("  ACTUAL_PREEMPT: ").append(getActuallyToBePreempted())
-        .append("  SELECTED: ").append(selected)
-        .append("\n");
+    sb.append(" NAME: " + getApplicationId()).append(" PRIO: ").append(priority)
+        .append(" CUR: ").append(getUsed()).append(" PEN: ").append(pending)
+        .append(" RESERVED: ").append(reserved).append(" IDEAL_ASSIGNED: ")
+        .append(idealAssigned).append(" PREEMPT_OTHER: ")
+        .append(getToBePreemptFromOther()).append(" IDEAL_PREEMPT: ")
+        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
+        .append(getActuallyToBePreempted()).append("\n");
 
     return sb.toString();
   }
@@ -95,12 +91,8 @@ public class TempAppPerPartition extends AbstractPreemptionEntity {
     return applicationId;
   }
 
-  public String getUser() {
-    return this.app.getUser();
-  }
-
   public void deductActuallyToBePreempted(ResourceCalculator resourceCalculator,
-      Resource cluster, Resource toBeDeduct) {
+      Resource cluster, Resource toBeDeduct, String partition) {
     if (Resources.greaterThan(resourceCalculator, cluster,
         getActuallyToBePreempted(), toBeDeduct)) {
       Resources.subtractFrom(getActuallyToBePreempted(), toBeDeduct);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 89452f9..7eab015 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -26,8 +26,6 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.LinkedHashMap;
-import java.util.Map;
 
 /**
  * Temporary data-structure tracking resource availability, pending resource
@@ -61,10 +59,6 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
   int relativePriority = 0;
   TempQueuePerPartition parent = null;
 
-  // This will hold a temp user data structure and will hold userlimit,
-  // idealAssigned, used etc.
-  Map<String, TempUserPerPartition> usersPerPartition = new LinkedHashMap<>();
-
   TempQueuePerPartition(String queueName, Resource current,
       boolean preemptionDisabled, String partition, Resource killable,
       float absCapacity, float absMaxCapacity, Resource totalPartitionResource,
@@ -295,12 +289,4 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
     return apps;
   }
 
-  public void addUserPerPartition(String userName,
-      TempUserPerPartition tmpUser) {
-    this.usersPerPartition.put(userName, tmpUser);
-  }
-
-  public Map<String, TempUserPerPartition> getUsersPerPartition() {
-    return usersPerPartition;
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
deleted file mode 100644
index 245b5d4..0000000
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempUserPerPartition.java
+++ /dev/null
@@ -1,88 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
-
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-
-/**
- * Temporary data-structure tracking resource availability, pending resource
- * need, current utilization for an application.
- */
-public class TempUserPerPartition extends AbstractPreemptionEntity {
-
-  private final User user;
-  private Resource userLimit;
-  private boolean donePreemptionQuotaForULDelta = false;
-
-  TempUserPerPartition(User user, String queueName, Resource usedPerPartition,
-      Resource amUsedPerPartition, Resource reserved,
-      Resource pendingPerPartition) {
-    super(queueName, usedPerPartition, amUsedPerPartition, reserved,
-        pendingPerPartition);
-    this.user = user;
-  }
-
-  @Override
-  public String toString() {
-    StringBuilder sb = new StringBuilder();
-    sb.append(" NAME: " + getUserName()).append(" CUR: ").append(getUsed())
-        .append(" PEN: ").append(pending).append(" RESERVED: ").append(reserved)
-        .append(" AM_USED: ").append(amUsed).append(" USER_LIMIT: ")
-        .append(getUserLimit()).append(" IDEAL_ASSIGNED: ")
-        .append(idealAssigned).append(" USED_WO_AMUSED: ")
-        .append(getUsedDeductAM()).append(" IDEAL_PREEMPT: ")
-        .append(toBePreempted).append(" ACTUAL_PREEMPT: ")
-        .append(getActuallyToBePreempted()).append("\n");
-
-    return sb.toString();
-  }
-
-  public String getUserName() {
-    return user.getUserName();
-  }
-
-  public Resource getUserLimit() {
-    return userLimit;
-  }
-
-  public void setUserLimit(Resource userLimitResource) {
-    this.userLimit = userLimitResource;
-  }
-
-  public boolean isUserLimitReached(ResourceCalculator rc,
-      Resource clusterResource) {
-    if (Resources.greaterThan(rc, clusterResource, getUsedDeductAM(),
-        userLimit)) {
-      return true;
-    }
-    return false;
-  }
-
-  public boolean isPreemptionQuotaForULDeltaDone() {
-    return this.donePreemptionQuotaForULDelta;
-  }
-
-  public void updatePreemptionQuotaForULDeltaAsDone(boolean done) {
-    this.donePreemptionQuotaForULDelta = done;
-  }
-}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index 026dd82..9fb92ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -1233,14 +1233,6 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final float DEFAULT_INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT =
       0.2f;
 
-   /**
-   * For intra-queue preemption, enforce a preemption order such as
-   * "userlimit_first" or "priority_first".
-   */
-  public static final String INTRAQUEUE_PREEMPTION_ORDER_POLICY = PREEMPTION_CONFIG_PREFIX
-      + INTRA_QUEUE_PREEMPTION_CONFIG_PREFIX + "preemption-order-policy";
-  public static final String DEFAULT_INTRAQUEUE_PREEMPTION_ORDER_POLICY = "userlimit_first";
-
   /**
    * Maximum application for a queue to be used when application per queue is
    * not defined.To be consistent with previous version the default value is set

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 71225b8..2b1efd6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -486,7 +486,7 @@ public class LeafQueue extends AbstractCSQueue {
       writeLock.lock();
       User u = users.get(userName);
       if (null == u) {
-        u = new User(userName);
+        u = new User();
         users.put(userName, u);
       }
       return u;
@@ -1292,7 +1292,7 @@ public class LeafQueue extends AbstractCSQueue {
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
         computeUserLimit(application.getUser(), clusterResource, user,
-            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+            partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
         partition);
   }
 
@@ -1366,7 +1366,7 @@ public class LeafQueue extends AbstractCSQueue {
     // TODO, need consider headroom respect labels also
     Resource userLimit =
         computeUserLimit(application.getUser(), clusterResource, queueUser,
-            nodePartition, schedulingMode, true);
+            nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
 
@@ -1410,7 +1410,7 @@ public class LeafQueue extends AbstractCSQueue {
   @Lock(NoLock.class)
   private Resource computeUserLimit(String userName,
       Resource clusterResource, User user,
-      String nodePartition, SchedulingMode schedulingMode, boolean forActive) {
+      String nodePartition, SchedulingMode schedulingMode) {
     Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
         clusterResource);
 
@@ -1462,21 +1462,16 @@ public class LeafQueue extends AbstractCSQueue {
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
-
-    final int usersCount;
-    if (forActive) {
-      usersCount = activeUsersManager.getNumActiveUsers();
-    } else {
-      usersCount = users.size();
-    }
-
+    
+    final int activeUsers = activeUsersManager.getNumActiveUsers();
+    
     // User limit resource is determined by:
     // max{currentCapacity / #activeUsers, currentCapacity *
     // user-limit-percentage%)
     Resource userLimitResource = Resources.max(
         resourceCalculator, partitionResource,
         Resources.divideAndCeil(
-            resourceCalculator, currentCapacity, usersCount),
+            resourceCalculator, currentCapacity, activeUsers),
         Resources.divideAndCeil(
             resourceCalculator, 
             Resources.multiplyAndRoundDown(
@@ -1524,16 +1519,14 @@ public class LeafQueue extends AbstractCSQueue {
           " qconsumed: " + queueUsage.getUsed() +
           " consumedRatio: " + totalUserConsumedRatio +
           " currentCapacity: " + currentCapacity +
-          " activeUsers: " + usersCount +
+          " activeUsers: " + activeUsers +
           " clusterCapacity: " + clusterResource +
           " resourceByLabel: " + partitionResource +
           " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
           " Partition: " + nodePartition
       );
     }
-    if (forActive) {
-      user.setUserResourceLimit(userLimitResource);
-    }
+    user.setUserResourceLimit(userLimitResource);
     return userLimitResource;
   }
   
@@ -1962,14 +1955,11 @@ public class LeafQueue extends AbstractCSQueue {
     volatile int activeApplications = 0;
     private UsageRatios userUsageRatios = new UsageRatios();
     private WriteLock writeLock;
-    String userName;
 
-    @VisibleForTesting
-    public User(String name) {
+    User() {
       ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
       // Nobody uses read-lock now, will add it when necessary
       writeLock = lock.writeLock();
-      this.userName = name;
     }
 
     public ResourceUsage getResourceUsage() {
@@ -2083,15 +2073,6 @@ public class LeafQueue extends AbstractCSQueue {
     public void setUserResourceLimit(Resource userResourceLimit) {
       this.userResourceLimit = userResourceLimit;
     }
-
-    public String getUserName() {
-      return this.userName;
-    }
-
-    @VisibleForTesting
-    public void setResourceUsage(ResourceUsage resourceUsage) {
-      this.userResourceUsage = resourceUsage;
-    }
   }
 
   @Override
@@ -2177,7 +2158,7 @@ public class LeafQueue extends AbstractCSQueue {
           User user = getUser(userName);
           Resource headroom = Resources.subtract(
               computeUserLimit(app.getUser(), clusterResources, user, partition,
-                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true),
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
               user.getUsed(partition));
           // Make sure headroom is not negative.
           headroom = Resources.componentwiseMax(headroom, Resources.none());
@@ -2214,7 +2195,7 @@ public class LeafQueue extends AbstractCSQueue {
     User user = getUser(userName);
 
     return computeUserLimit(userName, resources, user, partition,
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, true);
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
   }
 
   @Override
@@ -2396,26 +2377,4 @@ public class LeafQueue extends AbstractCSQueue {
       writeLock.unlock();
     }
   }
-
-  /**
-   * Get all valid users in this queue.
-   * @return user list
-   */
-  public Set<String> getAllUsers() {
-    return this.users.keySet();
-  }
-
-  public Resource getResourceLimitForActiveUsers(String userName,
-      Resource clusterResource, String partition,
-      SchedulingMode schedulingMode) {
-    return computeUserLimit(userName, clusterResource, getUser(userName),
-        partition, schedulingMode, true);
-  }
-
-  public synchronized Resource getResourceLimitForAllUsers(String userName,
-      Resource clusterResource, String partition, SchedulingMode schedulingMode)
-  {
-    return computeUserLimit(userName, clusterResource, getUser(userName),
-        partition, schedulingMode, false);
-  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index faac129..32b2c68 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -42,10 +42,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -71,6 +69,7 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeSet;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -97,7 +96,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   Clock mClock = null;
   CapacitySchedulerConfiguration conf = null;
   CapacityScheduler cs = null;
-  @SuppressWarnings("rawtypes")
   EventHandler<SchedulerEvent> mDisp = null;
   ProportionalCapacityPreemptionPolicy policy = null;
   Resource clusterResource = null;
@@ -249,7 +247,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         if (containerId == 1) {
           when(rmc.isAMContainer()).thenReturn(true);
           when(app.getAMResource(label)).thenReturn(res);
-          when(app.getAppAMNodePartitionName()).thenReturn(label);
         }
 
         if (reserved) {
@@ -283,12 +280,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
         containerId++;
       }
 
-      // If app has 0 container, and it has only pending, still make sure to
-      // update label.
-      if (repeat == 0) {
-        when(app.getAppAMNodePartitionName()).thenReturn(label);
-      }
-
       // Some more app specific aggregated data can be better filled here.
       when(app.getPriority()).thenReturn(pri);
       when(app.getUser()).thenReturn(userName);
@@ -324,15 +315,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
   private void mockApplications(String appsConfig) {
     int id = 1;
     HashMap<String, HashSet<String>> userMap = new HashMap<String, HashSet<String>>();
-    HashMap<String, HashMap<String, HashMap<String, ResourceUsage>>> userResourceUsagePerLabel = new HashMap<>();
     LeafQueue queue = null;
-    int mulp = -1;
     for (String a : appsConfig.split(";")) {
       String[] strs = a.split("\t");
       String queueName = strs[0];
-      if (mulp <= 0 && strs.length > 2 && strs[2] != null) {
-        mulp = 100 / (new Integer(strs[2]).intValue());
-      }
 
       // get containers
       List<RMContainer> liveContainers = new ArrayList<RMContainer>();
@@ -352,7 +338,6 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       when(app.getReservedContainers()).thenReturn(reservedContainers);
       when(app.getApplicationAttemptId()).thenReturn(appAttemptId);
       when(app.getApplicationId()).thenReturn(appId);
-      when(app.getQueueName()).thenReturn(queueName);
 
       // add to LeafQueue
       queue = (LeafQueue) nameToCSQueues.get(queueName);
@@ -366,70 +351,20 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
       }
 
       users.add(app.getUser());
-
-      String label = app.getAppAMNodePartitionName();
-
-      // Get label to queue
-      HashMap<String, HashMap<String, ResourceUsage>> userResourceUsagePerQueue = userResourceUsagePerLabel
-          .get(label);
-      if (null == userResourceUsagePerQueue) {
-        userResourceUsagePerQueue = new HashMap<>();
-        userResourceUsagePerLabel.put(label, userResourceUsagePerQueue);
-      }
-
-      // Get queue to user based resource map
-      HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerQueue
-          .get(queueName);
-      if (null == userResourceUsage) {
-        userResourceUsage = new HashMap<>();
-        userResourceUsagePerQueue.put(queueName, userResourceUsage);
-      }
-
-      // Get user to its resource usage.
-      ResourceUsage usage = userResourceUsage.get(app.getUser());
-      if (null == usage) {
-        usage = new ResourceUsage();
-        userResourceUsage.put(app.getUser(), usage);
-      }
-
-      usage.incAMUsed(app.getAMResource(label));
-      usage.incUsed(app.getAppAttemptResourceUsage().getUsed(label));
       id++;
     }
 
-    for (String label : userResourceUsagePerLabel.keySet()) {
-      for (String queueName : userMap.keySet()) {
-        queue = (LeafQueue) nameToCSQueues.get(queueName);
-        // Currently we have user-limit test support only for default label.
-        Resource totResoucePerPartition = partitionToResource.get("");
-        Resource capacity = Resources.multiply(totResoucePerPartition,
-            queue.getQueueCapacities().getAbsoluteCapacity());
-        HashSet<String> users = userMap.get(queue.getQueueName());
-        when(queue.getAllUsers()).thenReturn(users);
-        Resource userLimit;
-        if (mulp > 0) {
-          userLimit = Resources.divideAndCeil(rc, capacity, mulp);
-        } else {
-          userLimit = Resources.divideAndCeil(rc, capacity,
-              users.size());
-        }
-        LOG.debug("Updating user-limit from mock: totResoucePerPartition="
-            + totResoucePerPartition + ", capacity=" + capacity
-            + ", users.size()=" + users.size() + ", userlimit= " + userLimit
-            + ",label= " + label + ",queueName= " + queueName);
-
-        HashMap<String, ResourceUsage> userResourceUsage = userResourceUsagePerLabel
-            .get(label).get(queueName);
-        for (String userName : users) {
-          User user = new User(userName);
-          if (userResourceUsage != null) {
-            user.setResourceUsage(userResourceUsage.get(userName));
-          }
-          when(queue.getUser(eq(userName))).thenReturn(user);
-          when(queue.getResourceLimitForAllUsers(eq(userName),
-              any(Resource.class), anyString(), any(SchedulingMode.class)))
-                  .thenReturn(userLimit);
-        }
+    for (String queueName : userMap.keySet()) {
+      queue = (LeafQueue) nameToCSQueues.get(queueName);
+      // Currently we have user-limit test support only for default label.
+      Resource totResoucePerPartition = partitionToResource.get("");
+      Resource capacity = Resources.multiply(totResoucePerPartition,
+          queue.getQueueCapacities().getAbsoluteCapacity());
+      HashSet<String> users = userMap.get(queue.getQueueName());
+      Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
+      for (String user : users) {
+        when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
+            anyString())).thenReturn(userLimit);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e6cdf770/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
index 6c5aa67..bf83e1c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TestProportionalCapacityPreemptionPolicyIntraQueue.java
@@ -62,16 +62,12 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
      * Apps which are running at low priority (4) will preempt few of its
      * resources to meet the demand.
      */
-
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
-
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
         "n1= res=100";
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
-        "root(=[100 100 79 120 0]);" + // root
+        "root(=[100 100 80 120 0]);" + // root
             "-a(=[11 100 11 50 0]);" + // a
             "-b(=[40 100 38 60 0]);" + // b
             "-c(=[20 100 10 10 0]);" + // c
@@ -308,8 +304,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -363,8 +357,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     // report "ideal" preempt as 10%. Ensure preemption happens only for 10%
     conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND,
         (float) 0.1);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -419,8 +411,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -428,7 +418,7 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     String queuesConfig =
         // guaranteed,max,used,pending,reserved
         "root(=[100 100 95 170 0]);" + // root
-            "-a(=[60 100 70 35 0]);" + // a
+            "-a(=[60 100 70 50 0]);" + // a
             "-b(=[40 100 25 120 0])"; // b
 
     String appsConfig =
@@ -477,8 +467,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -528,8 +516,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
      * cycle. Eventhough there are more demand and no other low priority
      * apps are present, still AM contaier need to soared.
      */
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -674,8 +660,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;" + // default partition
         "x=100,true"; // partition=x
@@ -736,8 +720,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;";
     String nodesConfig = // n1 has no label
@@ -858,10 +840,8 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     policy.editSchedule();
 
     // Considering user-limit of 50% since only 2 users are there, only preempt
-    // 14 more (5 is already running) eventhough demand is for 30. Ideally we
-    // must preempt 15. But 15th container will bring user1's usage to 20 which
-    // is same as user-limit. Hence skip 15th container.
-    verify(mDisp, times(14)).handle(argThat(
+    // 15 more (5 is already running) eventhough demand is for 30.
+    verify(mDisp, times(15)).handle(argThat(
         new TestProportionalCapacityPreemptionPolicy.IsPreemptionRequestFor(
             getAppAttemptId(3))));
   }
@@ -889,8 +869,6 @@ public class TestProportionalCapacityPreemptionPolicyIntraQueue
     conf.setFloat(CapacitySchedulerConfiguration.
         INTRAQUEUE_PREEMPTION_MAX_ALLOWABLE_LIMIT,
         (float) 0.5);
-    conf.set(CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ORDER_POLICY,
-        "priority_first");
 
     String labelsConfig = "=100,true;" + // default partition
         "x=100,true"; // partition=x


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org