You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2017/07/12 15:07:16 UTC

[1/2] hadoop git commit: YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)

Repository: hadoop
Updated Branches:
  refs/heads/branch-2 e6cdf770c -> f2d440b3b


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
new file mode 100644
index 0000000..05503c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -0,0 +1,982 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link UsersManager} tracks users in the system and its respective data
+ * structures.
+ */
+@Private
+public class UsersManager implements AbstractUsersManager {
+
+  private static final Log LOG = LogFactory.getLog(UsersManager.class);
+
+  /*
+   * Member declaration for UsersManager class.
+   */
+  private final LeafQueue lQueue;
+  private final RMNodeLabelsManager labelManager;
+  private final ResourceCalculator resourceCalculator;
+  private final CapacitySchedulerContext scheduler;
+  private Map<String, User> users = new ConcurrentHashMap<>();
+
+  private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
+  private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage();
+  private Set<String> activeUsersSet = new HashSet<String>();
+  private Set<String> nonActiveUsersSet = new HashSet<String>();
+
+  // Summation of consumed ratios for all users in queue
+  private UsageRatios qUsageRatios;
+
+  // To detect whether there is a change in user count for every user-limit
+  // calculation.
+  private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
+  private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
+      new HashMap<String, Map<SchedulingMode, Long>>();
+  private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
+      new HashMap<String, Map<SchedulingMode, Long>>();
+
+  private volatile int userLimit;
+  private volatile float userLimitFactor;
+
+  private WriteLock writeLock;
+  private ReadLock readLock;
+
+  private final QueueMetrics metrics;
+  private AtomicInteger activeUsers = new AtomicInteger(0);
+  private Map<String, Set<ApplicationId>> usersApplications =
+      new HashMap<String, Set<ApplicationId>>();
+
+  // Pre-computed list of user-limits.
+  Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
+  Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
+
+  /**
+   * UsageRatios will store the total used resources ratio across all users of
+   * the queue.
+   */
+  static private class UsageRatios {
+    private Map<String, Float> usageRatios;
+    private ReadLock readLock;
+    private WriteLock writeLock;
+
+    public UsageRatios() {
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      readLock = lock.readLock();
+      writeLock = lock.writeLock();
+      usageRatios = new HashMap<String, Float>();
+    }
+
+    private void incUsageRatio(String label, float delta) {
+      try {
+        writeLock.lock();
+        float usage = 0f;
+        if (usageRatios.containsKey(label)) {
+          usage = usageRatios.get(label);
+        }
+        usage += delta;
+        usageRatios.put(label, usage);
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    private float getUsageRatio(String label) {
+      try {
+        readLock.lock();
+        Float f = usageRatios.get(label);
+        if (null == f) {
+          return 0.0f;
+        }
+        return f;
+      } finally {
+        readLock.unlock();
+      }
+    }
+
+    private void setUsageRatio(String label, float ratio) {
+      try {
+        writeLock.lock();
+        usageRatios.put(label, ratio);
+      } finally {
+        writeLock.unlock();
+      }
+    }
+  } /* End of UserRatios class */
+
+  /**
+   * User class stores all user related resource usage, application details.
+   */
+  @VisibleForTesting
+  public static class User {
+    ResourceUsage userResourceUsage = new ResourceUsage();
+    String userName = null;
+    volatile Resource userResourceLimit = Resource.newInstance(0, 0);
+    private volatile AtomicInteger pendingApplications = new AtomicInteger(0);
+    private volatile AtomicInteger activeApplications = new AtomicInteger(0);
+
+    private UsageRatios userUsageRatios = new UsageRatios();
+    private WriteLock writeLock;
+
+    public User(String name) {
+      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+      // Nobody uses read-lock now, will add it when necessary
+      writeLock = lock.writeLock();
+
+      this.userName = name;
+    }
+
+    public ResourceUsage getResourceUsage() {
+      return userResourceUsage;
+    }
+
+    public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
+        Resource resource, String nodePartition) {
+      try {
+        writeLock.lock();
+        userUsageRatios.setUsageRatio(nodePartition, 0);
+        return updateUsageRatio(resourceCalculator, resource, nodePartition);
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    public float updateUsageRatio(ResourceCalculator resourceCalculator,
+        Resource resource, String nodePartition) {
+      try {
+        writeLock.lock();
+        float delta;
+        float newRatio = Resources.ratio(resourceCalculator,
+            getUsed(nodePartition), resource);
+        delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
+        userUsageRatios.setUsageRatio(nodePartition, newRatio);
+        return delta;
+      } finally {
+        writeLock.unlock();
+      }
+    }
+
+    public Resource getUsed() {
+      return userResourceUsage.getUsed();
+    }
+
+    public Resource getAllUsed() {
+      return userResourceUsage.getAllUsed();
+    }
+
+    public Resource getUsed(String label) {
+      return userResourceUsage.getUsed(label);
+    }
+
+    public int getPendingApplications() {
+      return pendingApplications.get();
+    }
+
+    public int getActiveApplications() {
+      return activeApplications.get();
+    }
+
+    public Resource getConsumedAMResources() {
+      return userResourceUsage.getAMUsed();
+    }
+
+    public Resource getConsumedAMResources(String label) {
+      return userResourceUsage.getAMUsed(label);
+    }
+
+    public int getTotalApplications() {
+      return getPendingApplications() + getActiveApplications();
+    }
+
+    public void submitApplication() {
+      pendingApplications.incrementAndGet();
+    }
+
+    public void activateApplication() {
+      pendingApplications.decrementAndGet();
+      activeApplications.incrementAndGet();
+    }
+
+    public void finishApplication(boolean wasActive) {
+      if (wasActive) {
+        activeApplications.decrementAndGet();
+      } else {
+        pendingApplications.decrementAndGet();
+      }
+    }
+
+    public Resource getUserResourceLimit() {
+      return userResourceLimit;
+    }
+
+    public void setUserResourceLimit(Resource userResourceLimit) {
+      this.userResourceLimit = userResourceLimit;
+    }
+  } /* End of User class */
+
+  /**
+   * UsersManager Constructor.
+   *
+   * @param metrics
+   *          Queue Metrics
+   * @param lQueue
+   *          Leaf Queue Object
+   * @param labelManager
+   *          Label Manager instance
+   * @param scheduler
+   *          Capacity Scheduler Context
+   * @param resourceCalculator
+   *          rc
+   */
+  public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
+      RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
+      ResourceCalculator resourceCalculator) {
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.lQueue = lQueue;
+    this.scheduler = scheduler;
+    this.labelManager = labelManager;
+    this.resourceCalculator = resourceCalculator;
+    this.qUsageRatios = new UsageRatios();
+    this.metrics = metrics;
+
+    this.writeLock = lock.writeLock();
+    this.readLock = lock.readLock();
+  }
+
+  /**
+   * Get configured user-limit.
+   * @return user limit
+   */
+  public int getUserLimit() {
+    return userLimit;
+  }
+
+  /**
+   * Set configured user-limit.
+   * @param userLimit user limit
+   */
+  public void setUserLimit(int userLimit) {
+    this.userLimit = userLimit;
+  }
+
+  /**
+   * Get configured user-limit factor.
+   * @return user-limit factor
+   */
+  public float getUserLimitFactor() {
+    return userLimitFactor;
+  }
+
+  /**
+   * Set configured user-limit factor.
+   * @param userLimitFactor User Limit factor.
+   */
+  public void setUserLimitFactor(float userLimitFactor) {
+    this.userLimitFactor = userLimitFactor;
+  }
+
+  @VisibleForTesting
+  public float getUsageRatio(String label) {
+    return qUsageRatios.getUsageRatio(label);
+  }
+
+  /**
+   * Force UsersManager to recompute userlimit.
+   */
+  public void userLimitNeedsRecompute() {
+
+    // If latestVersionOfUsersState is negative due to overflow, ideally we need
+    // to reset it. This method is invoked from UsersManager and LeafQueue and
+    // all is happening within write/readLock. Below logic can help to set 0.
+    try {
+      writeLock.lock();
+
+      long value = latestVersionOfUsersState.incrementAndGet();
+      if (value < 0) {
+        latestVersionOfUsersState.set(0);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /*
+   * Get all users of queue.
+   */
+  private Map<String, User> getUsers() {
+    return users;
+  }
+
+  /**
+   * Get user object for given user name.
+   *
+   * @param userName
+   *          User Name
+   * @return User object
+   */
+  public User getUser(String userName) {
+    return users.get(userName);
+  }
+
+  /**
+   * Remove user.
+   *
+   * @param userName
+   *          User Name
+   */
+  public void removeUser(String userName) {
+    try {
+      writeLock.lock();
+      this.users.remove(userName);
+
+      // Remove user from active/non-active list as well.
+      activeUsersSet.remove(userName);
+      nonActiveUsersSet.remove(userName);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /**
+   * Get and add user if absent.
+   *
+   * @param userName
+   *          User Name
+   * @return User object
+   */
+  public User getUserAndAddIfAbsent(String userName) {
+    try {
+      writeLock.lock();
+      User u = getUser(userName);
+      if (null == u) {
+        u = new User(userName);
+        addUser(userName, u);
+
+        // Add to nonActive list so that resourceUsage could be tracked
+        if (!nonActiveUsersSet.contains(userName)) {
+          nonActiveUsersSet.add(userName);
+        }
+      }
+      return u;
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /*
+   * Add a new user
+   */
+  private void addUser(String userName, User user) {
+    this.users.put(userName, user);
+  }
+
+  /**
+   * @return an ArrayList of UserInfo objects who are active in this queue
+   */
+  public ArrayList<UserInfo> getUsersInfo() {
+    try {
+      readLock.lock();
+      ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
+      for (Map.Entry<String, User> entry : getUsers().entrySet()) {
+        User user = entry.getValue();
+        usersToReturn.add(
+            new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
+                user.getActiveApplications(), user.getPendingApplications(),
+                Resources.clone(user.getConsumedAMResources()),
+                Resources.clone(user.getUserResourceLimit()),
+                user.getResourceUsage()));
+      }
+      return usersToReturn;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  /**
+   * Get computed user-limit for all ACTIVE users in this queue. If cached data
+   * is invalidated due to resource change, this method also enforce to
+   * recompute user-limit.
+   *
+   * @param userName
+   *          Name of user who has submitted one/more app to given queue.
+   * @param clusterResource
+   *          total cluster resource
+   * @param nodePartition
+   *          partition name
+   * @param schedulingMode
+   *          scheduling mode
+   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+   * @return Computed User Limit
+   */
+  public Resource getComputedResourceLimitForActiveUsers(String userName,
+      Resource clusterResource, String nodePartition,
+      SchedulingMode schedulingMode) {
+
+    Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedActiveUserLimit
+        .get(nodePartition);
+
+    try {
+      writeLock.lock();
+      if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
+        // recompute
+        userLimitPerSchedulingMode = reComputeUserLimits(userName,
+            nodePartition, clusterResource, schedulingMode, true);
+
+        // update user count to cache so that we can avoid recompute if no major
+        // changes.
+        setLocalVersionOfUsersState(nodePartition, schedulingMode, true);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("userLimit is fetched. userLimit = "
+          + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+          + schedulingMode + ", partition=" + nodePartition);
+    }
+
+    return userLimitPerSchedulingMode.get(schedulingMode);
+  }
+
+  /**
+   * Get computed user-limit for all users in this queue. If cached data is
+   * invalidated due to resource change, this method also enforce to recompute
+   * user-limit.
+   *
+   * @param userName
+   *          Name of user who has submitted one/more app to given queue.
+   * @param clusterResource
+   *          total cluster resource
+   * @param nodePartition
+   *          partition name
+   * @param schedulingMode
+   *          scheduling mode
+   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+   * @return Computed User Limit
+   */
+  public Resource getComputedResourceLimitForAllUsers(String userName,
+      Resource clusterResource, String nodePartition,
+      SchedulingMode schedulingMode) {
+
+    Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedAllUserLimit
+        .get(nodePartition);
+
+    try {
+      writeLock.lock();
+      if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
+        // recompute
+        userLimitPerSchedulingMode = reComputeUserLimits(userName,
+            nodePartition, clusterResource, schedulingMode, false);
+
+        // update user count to cache so that we can avoid recompute if no major
+        // changes.
+        setLocalVersionOfUsersState(nodePartition, schedulingMode, false);
+      }
+    } finally {
+      writeLock.unlock();
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("userLimit is fetched. userLimit = "
+          + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+          + schedulingMode + ", partition=" + nodePartition);
+    }
+
+    return userLimitPerSchedulingMode.get(schedulingMode);
+  }
+
+  /*
+   * Recompute user-limit under following conditions: 1. cached user-limit does
+   * not exist in local map. 2. Total User count doesn't match with local cached
+   * version.
+   */
+  private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
+      String nodePartition, boolean isActive) {
+    return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
+        isActive) != latestVersionOfUsersState.get());
+  }
+
+  /*
+   * Set Local version of user count per label to invalidate cache if needed.
+   */
+  private void setLocalVersionOfUsersState(String nodePartition,
+      SchedulingMode schedulingMode, boolean isActive) {
+    try {
+      writeLock.lock();
+      Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
+          ? localVersionOfActiveUsersState
+          : localVersionOfAllUsersState;
+
+      Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
+          .get(nodePartition);
+      if (null == localVersion) {
+        localVersion = new HashMap<SchedulingMode, Long>();
+        localVersionOfUsersState.put(nodePartition, localVersion);
+      }
+
+      localVersion.put(schedulingMode, latestVersionOfUsersState.get());
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /*
+   * Get Local version of user count per label to invalidate cache if needed.
+   */
+  private long getLocalVersionOfUsersState(String nodePartition,
+      SchedulingMode schedulingMode, boolean isActive) {
+    try {
+      this.readLock.lock();
+      Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
+          ? localVersionOfActiveUsersState
+          : localVersionOfAllUsersState;
+
+      if (!localVersionOfUsersState.containsKey(nodePartition)) {
+        return -1;
+      }
+
+      Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
+          .get(nodePartition);
+      if (!localVersion.containsKey(schedulingMode)) {
+        return -1;
+      }
+
+      return localVersion.get(schedulingMode);
+    } finally {
+      readLock.unlock();
+    }
+  }
+
+  private Map<SchedulingMode, Resource> reComputeUserLimits(String userName,
+      String nodePartition, Resource clusterResource,
+      SchedulingMode schedulingMode, boolean activeMode) {
+
+    // preselect stored map as per active user-limit or all user computation.
+    Map<String, Map<SchedulingMode, Resource>> computedMap = null;
+    computedMap = (activeMode)
+        ? preComputedActiveUserLimit
+        : preComputedAllUserLimit;
+
+    Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap
+        .get(nodePartition);
+
+    if (userLimitPerSchedulingMode == null) {
+      userLimitPerSchedulingMode = new ConcurrentHashMap<>();
+      computedMap.put(nodePartition, userLimitPerSchedulingMode);
+    }
+
+    // compute user-limit per scheduling mode.
+    Resource computedUserLimit = computeUserLimit(userName, clusterResource,
+        nodePartition, schedulingMode, activeMode);
+
+    // update in local storage
+    userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
+
+    return userLimitPerSchedulingMode;
+  }
+
+  private Resource computeUserLimit(String userName, Resource clusterResource,
+      String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
+    Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
+        clusterResource);
+
+    /*
+     * What is our current capacity?
+     * * It is equal to the max(required, queue-capacity) if we're running
+     * below capacity. The 'max' ensures that jobs in queues with miniscule
+     * capacity (< 1 slot) make progress
+     * * If we're running over capacity, then its (usedResources + required)
+     * (which extra resources we are allocating)
+     */
+    Resource queueCapacity = Resources.multiplyAndNormalizeUp(
+        resourceCalculator, partitionResource,
+        lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
+        lQueue.getMinimumAllocation());
+
+    /*
+     * Assume we have required resource equals to minimumAllocation, this can
+     * make sure user limit can continuously increase till queueMaxResource
+     * reached.
+     */
+    Resource required = lQueue.getMinimumAllocation();
+
+    // Allow progress for queues with miniscule capacity
+    queueCapacity = Resources.max(resourceCalculator, partitionResource,
+        queueCapacity, required);
+
+    /*
+     * We want to base the userLimit calculation on max(queueCapacity,
+     * usedResources+required). However, we want usedResources to be based on
+     * the combined ratios of all the users in the queue so we use consumedRatio
+     * to calculate such. The calculation is dependent on how the
+     * resourceCalculator calculates the ratio between two Resources. DRF
+     * Example: If usedResources is greater than queueCapacity and users have
+     * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
+     * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
+     * is then 20+30=50%. Yes, this value can be larger than 100% but for the
+     * purposes of making sure all users are getting their fair share, it works.
+     */
+    Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
+        partitionResource, getUsageRatio(nodePartition),
+        lQueue.getMinimumAllocation());
+    Resource currentCapacity = Resources.lessThan(resourceCalculator,
+        partitionResource, consumed, queueCapacity)
+            ? queueCapacity
+            : Resources.add(consumed, required);
+
+    /*
+     * Never allow a single user to take more than the queue's configured
+     * capacity * user-limit-factor. Also, the queue's configured capacity
+     * should be higher than queue-hard-limit * ulMin
+     */
+    int usersCount = getNumActiveUsers();
+    Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
+
+    // For non-activeUser calculation, consider all users count.
+    if (!activeUser) {
+      resourceUsed = currentCapacity;
+      usersCount = users.size();
+    }
+
+    /*
+     * User limit resource is determined by: max{currentCapacity / #activeUsers,
+     * currentCapacity * user-limit-percentage%)
+     */
+    Resource userLimitResource = Resources.max(resourceCalculator,
+        partitionResource,
+        Resources.divideAndCeil(resourceCalculator, resourceUsed,
+            usersCount),
+        Resources.divideAndCeil(resourceCalculator,
+            Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
+            100));
+
+    // User limit is capped by maxUserLimit
+    // - maxUserLimit = queueCapacity * user-limit-factor
+    // (RESPECT_PARTITION_EXCLUSIVITY)
+    // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
+    //
+    // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
+    // partition, its guaranteed resource on that partition is 0. And
+    // user-limit-factor computation is based on queue's guaranteed capacity. So
+    // we will not cap user-limit as well as used resource when doing
+    // IGNORE_PARTITION_EXCLUSIVITY allocation.
+    Resource maxUserLimit = Resources.none();
+    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+      maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
+          getUserLimitFactor());
+    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+      maxUserLimit = partitionResource;
+    }
+
+    // Cap final user limit with maxUserLimit
+    userLimitResource = Resources
+        .roundUp(resourceCalculator,
+            Resources.min(resourceCalculator, partitionResource,
+                userLimitResource, maxUserLimit),
+            lQueue.getMinimumAllocation());
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("User limit computation for " + userName + " in queue "
+          + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
+          + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
+          + required + " consumed: " + consumed + " user-limit-resource: "
+          + userLimitResource + " queueCapacity: " + queueCapacity
+          + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
+          + " currentCapacity: " + currentCapacity + " activeUsers: "
+          + usersCount + " clusterCapacity: " + clusterResource
+          + " resourceByLabel: " + partitionResource + " usageratio: "
+          + getUsageRatio(nodePartition) + " Partition: " + nodePartition);
+    }
+    getUser(userName).setUserResourceLimit(userLimitResource);
+    return userLimitResource;
+  }
+
+  /**
+   * Update new usage ratio.
+   *
+   * @param partition
+   *          Node partition
+   * @param clusterResource
+   *          Cluster Resource
+   */
+  public void updateUsageRatio(String partition, Resource clusterResource) {
+    try {
+      writeLock.lock();
+      Resource resourceByLabel = labelManager.getResourceByLabel(partition,
+          clusterResource);
+      float consumed = 0;
+      User user;
+      for (Map.Entry<String, User> entry : getUsers().entrySet()) {
+        user = entry.getValue();
+        consumed += user.setAndUpdateUsageRatio(resourceCalculator,
+            resourceByLabel, partition);
+      }
+
+      qUsageRatios.setUsageRatio(partition, consumed);
+    } finally {
+      writeLock.unlock();
+    }
+  }
+
+  /*
+   * Increment Queue Usage Ratio.
+   */
+  private void incQueueUsageRatio(String nodePartition, float delta) {
+    qUsageRatios.incUsageRatio(nodePartition, delta);
+  }
+
+  @Override
+  public void activateApplication(String user, ApplicationId applicationId) {
+    try {
+      this.writeLock.lock();
+
+      Set<ApplicationId> userApps = usersApplications.get(user);
+      if (userApps == null) {
+        userApps = new HashSet<ApplicationId>();
+        usersApplications.put(user, userApps);
+        activeUsers.incrementAndGet();
+        metrics.incrActiveUsers();
+
+        // A user is added to active list. Invalidate user-limit cache.
+        userLimitNeedsRecompute();
+        updateActiveUsersResourceUsage(user);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("User " + user + " added to activeUsers, currently: "
+              + activeUsers);
+        }
+      }
+      if (userApps.add(applicationId)) {
+        metrics.activateApp(user);
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
+  public void deactivateApplication(String user, ApplicationId applicationId) {
+    try {
+      this.writeLock.lock();
+
+      Set<ApplicationId> userApps = usersApplications.get(user);
+      if (userApps != null) {
+        if (userApps.remove(applicationId)) {
+          metrics.deactivateApp(user);
+        }
+        if (userApps.isEmpty()) {
+          usersApplications.remove(user);
+          activeUsers.decrementAndGet();
+          metrics.decrActiveUsers();
+
+          // A user is removed from active list. Invalidate user-limit cache.
+          userLimitNeedsRecompute();
+          updateNonActiveUsersResourceUsage(user);
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("User " + user + " removed from activeUsers, currently: "
+                + activeUsers);
+          }
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  @Override
+  public int getNumActiveUsers() {
+    return activeUsers.get();
+  }
+
+  private void updateActiveUsersResourceUsage(String userName) {
+    try {
+      this.writeLock.lock();
+
+      // For UT case: We might need to add the user to users list.
+      User user = getUserAndAddIfAbsent(userName);
+      ResourceUsage resourceUsage = user.getResourceUsage();
+      // If User is moved to active list, moved resource usage from non-active
+      // to active list.
+      if (nonActiveUsersSet.contains(userName)) {
+        nonActiveUsersSet.remove(userName);
+        activeUsersSet.add(userName);
+
+        // Update total resource usage of active and non-active after user
+        // is moved from non-active to active.
+        for (String partition : resourceUsage.getNodePartitionsSet()) {
+          totalResUsageForNonActiveUsers.decUsed(partition,
+              resourceUsage.getUsed(partition));
+          totalResUsageForActiveUsers.incUsed(partition,
+              resourceUsage.getUsed(partition));
+        }
+
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("User '" + userName
+              + "' has become active. Hence move user to active list."
+              + "Active users size = " + activeUsersSet.size()
+              + "Non-active users size = " + nonActiveUsersSet.size()
+              + "Total Resource usage for active users="
+              + totalResUsageForActiveUsers.getAllUsed() + "."
+              + "Total Resource usage for non-active users="
+              + totalResUsageForNonActiveUsers.getAllUsed());
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private void updateNonActiveUsersResourceUsage(String userName) {
+    try {
+      this.writeLock.lock();
+
+      // For UT case: We might need to add the user to users list.
+      User user = getUserAndAddIfAbsent(userName);
+      ResourceUsage resourceUsage = user.getResourceUsage();
+      // If User is moved to non-active list, moved resource usage from
+      // non-active to active list.
+      if (activeUsersSet.contains(userName)) {
+        activeUsersSet.remove(userName);
+        nonActiveUsersSet.add(userName);
+
+        // Update total resource usage of active and non-active after user is
+        // moved from active to non-active.
+        for (String partition : resourceUsage.getNodePartitionsSet()) {
+          totalResUsageForActiveUsers.decUsed(partition,
+              resourceUsage.getUsed(partition));
+          totalResUsageForNonActiveUsers.incUsed(partition,
+              resourceUsage.getUsed(partition));
+
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("User '" + userName
+                + "' has become non-active.Hence move user to non-active list."
+                + "Active users size = " + activeUsersSet.size()
+                + "Non-active users size = " + nonActiveUsersSet.size()
+                + "Total Resource usage for active users="
+                + totalResUsageForActiveUsers.getAllUsed() + "."
+                + "Total Resource usage for non-active users="
+                + totalResUsageForNonActiveUsers.getAllUsed());
+          }
+        }
+      }
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private ResourceUsage getTotalResourceUsagePerUser(String userName) {
+    if (nonActiveUsersSet.contains(userName)) {
+      return totalResUsageForNonActiveUsers;
+    } else if (activeUsersSet.contains(userName)) {
+      return totalResUsageForActiveUsers;
+    } else {
+      LOG.warn("User '" + userName
+          + "' is not present in active/non-active. This is highly unlikely."
+          + "We can consider this user in non-active list in this case.");
+      return totalResUsageForNonActiveUsers;
+    }
+  }
+
+  /**
+   * During container allocate/release, ensure that all user specific data
+   * structures are updated.
+   *
+   * @param userName
+   *          Name of the user
+   * @param resource
+   *          Resource to increment/decrement
+   * @param nodePartition
+   *          Node label
+   * @param isAllocate
+   *          Indicate whether to allocate or release resource
+   * @return user
+   */
+  public User updateUserResourceUsage(String userName, Resource resource,
+      String nodePartition, boolean isAllocate) {
+    try {
+      this.writeLock.lock();
+
+      // TODO, should use getUser, use this method just to avoid UT failure
+      // which is caused by wrong invoking order, will fix UT separately
+      User user = getUserAndAddIfAbsent(userName);
+
+      // New container is allocated. Invalidate user-limit.
+      updateResourceUsagePerUser(user, resource, nodePartition, isAllocate);
+
+      userLimitNeedsRecompute();
+
+      // Update usage ratios
+      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+          scheduler.getClusterResource());
+      incQueueUsageRatio(nodePartition, user.updateUsageRatio(
+          resourceCalculator, resourceByLabel, nodePartition));
+
+      return user;
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
+
+  private void updateResourceUsagePerUser(User user, Resource resource,
+      String nodePartition, boolean isAllocate) {
+    ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser(
+        user.userName);
+
+    if (isAllocate) {
+      user.getResourceUsage().incUsed(nodePartition, resource);
+      totalResourceUsageForUsers.incUsed(nodePartition, resource);
+    } else {
+      user.getResourceUsage().decUsed(nodePartition, resource);
+      totalResourceUsageForUsers.decUsed(nodePartition, resource);
+    }
+
+    if (LOG.isDebugEnabled()) {
+      LOG.debug(
+          "User resource is updated." + "Total Resource usage for active users="
+              + totalResUsageForActiveUsers.getAllUsed() + "."
+              + "Total Resource usage for non-active users="
+              + totalResUsageForNonActiveUsers.getAllUsed());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 3635398..ee5de60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
@@ -115,24 +115,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
       new ConcurrentHashMap<>();
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext) {
-    this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+    this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
         Priority.newInstance(0), false);
   }
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
-    this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+    this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
         appPriority, isAttemptRecovering, null);
   }
 
   public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
       ActivitiesManager activitiesManager) {
-    super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+    super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
 
     RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 0fad8be..855b8f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -488,7 +488,7 @@ public class FSLeafQueue extends FSQueue {
   }
 
   @Override
-  public ActiveUsersManager getActiveUsersManager() {
+  public ActiveUsersManager getAbstractUsersManager() {
     return activeUsersManager;
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 3bc81ac..5b4e4dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -281,7 +281,7 @@ public class FSParentQueue extends FSQueue {
   }
   
   @Override
-  public ActiveUsersManager getActiveUsersManager() {
+  public ActiveUsersManager getAbstractUsersManager() {
     // Should never be called since all applications are submitted to LeafQueues
     return null;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index b90063d..61b527a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -177,7 +177,7 @@ public class FifoScheduler extends
     }
     
     @Override
-    public ActiveUsersManager getActiveUsersManager() {
+    public ActiveUsersManager getAbstractUsersManager() {
       return activeUsersManager;
     }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index e0ac56f..7dcdf58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
     maxApplications = q.getMaxApplications();
     maxApplicationsPerUser = q.getMaxApplicationsPerUser();
     userLimit = q.getUserLimit();
-    users = new UsersInfo(q.getUsers());
+    users = new UsersInfo(q.getUsersManager().getUsersInfo());
     userLimitFactor = q.getUserLimitFactor();
     AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
     usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index 32b2c68..dfab3b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -362,9 +363,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
           queue.getQueueCapacities().getAbsoluteCapacity());
       HashSet<String> users = userMap.get(queue.getQueueName());
       Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
-      for (String user : users) {
-        when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
-            anyString())).thenReturn(userLimit);
+      for (String userName : users) {
+        when(queue.getResourceLimitForAllUsers(eq(userName),
+            any(Resource.class), anyString(), any(SchedulingMode.class)))
+                .thenReturn(userLimit);
       }
     }
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index ca73b6a..fa16eff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -73,7 +73,7 @@ public class TestSchedulerApplicationAttempt {
     RMContext rmContext = mock(RMContext.class);
     when(rmContext.getEpoch()).thenReturn(3L);
     SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
-        user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
+        user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
     oldMetrics.submitApp(user);
     
     // confirm that containerId is calculated based on epoch.
@@ -169,7 +169,7 @@ public class TestSchedulerApplicationAttempt {
     ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
     Queue queue = mock(Queue.class);
     when(queue.getMetrics()).thenReturn(metrics);
-    when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+    when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
     when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
     return queue;
   }
@@ -198,7 +198,7 @@ public class TestSchedulerApplicationAttempt {
     Queue queue = createQueue("test", null);
     SchedulerApplicationAttempt app =
         new SchedulerApplicationAttempt(appAttId, user, queue,
-            queue.getActiveUsersManager(), rmContext);
+            queue.getAbstractUsersManager(), rmContext);
 
     // Resource request
     Resource requestedResource = Resource.newInstance(1536, 2);
@@ -211,7 +211,7 @@ public class TestSchedulerApplicationAttempt {
 
     queue = createQueue("test2", null, 0.5f);
     app = new SchedulerApplicationAttempt(appAttId, user, queue,
-        queue.getActiveUsersManager(), rmContext);
+        queue.getAbstractUsersManager(), rmContext);
     app.attemptResourceUsage.incUsed(requestedResource);
     assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
         0.01f);
@@ -229,7 +229,7 @@ public class TestSchedulerApplicationAttempt {
 
     queue = createQueue("test3", null, 0.0f);
     app = new SchedulerApplicationAttempt(appAttId, user, queue,
-        queue.getActiveUsersManager(), rmContext);
+        queue.getAbstractUsersManager(), rmContext);
 
     // Resource request
     app.attemptResourceUsage.incUsed(requestedResource);
@@ -255,7 +255,7 @@ public class TestSchedulerApplicationAttempt {
     final String user = "user1";
     Queue queue = createQueue("test", null);
     SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
-        user, queue, queue.getActiveUsersManager(), rmContext);
+        user, queue, queue.getAbstractUsersManager(), rmContext);
 
     // Resource request
     Resource requestedResource = Resource.newInstance(1536, 2);
@@ -274,7 +274,7 @@ public class TestSchedulerApplicationAttempt {
     RMContext rmContext = mock(RMContext.class);
     when(rmContext.getEpoch()).thenReturn(3L);
     SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
-        attemptId, "user", queue, queue.getActiveUsersManager(), rmContext);
+        attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
     Priority priority = Priority.newInstance(1);
     SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
     assertEquals(0, app.getSchedulingOpportunities(schedulerKey));

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 37b0da8..e9b1f9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -193,7 +193,7 @@ public class TestApplicationLimits {
         clusterResource));
     
     ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
-    when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+    when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
 
     assertEquals(Resource.newInstance(8 * GB, 1),
         queue.calculateAndGetAMResourceLimit());
@@ -634,7 +634,7 @@ public class TestApplicationLimits {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
       appAttemptId_0_0, user_0, queue, 
-            queue.getActiveUsersManager(), spyRMContext);
+            queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_0, user_0);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -646,7 +646,7 @@ public class TestApplicationLimits {
     // Schedule to compute 
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
+    Resource expectedHeadroom = Resources.createResource(5*16*GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
     // Submit second application from user_0, check headroom
@@ -654,7 +654,7 @@ public class TestApplicationLimits {
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
       appAttemptId_0_1, user_0, queue, 
-            queue.getActiveUsersManager(), spyRMContext);
+            queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_1, user_0);
     
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -674,7 +674,7 @@ public class TestApplicationLimits {
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
       appAttemptId_1_0, user_1, queue, 
-            queue.getActiveUsersManager(), spyRMContext);
+            queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_1_0, user_1);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@@ -693,6 +693,11 @@ public class TestApplicationLimits {
 
     // Now reduce cluster size and check for the smaller headroom
     clusterResource = Resources.createResource(90*16*GB);
+
+    // Any change is cluster resource needs to enforce user-limit recomputation.
+    // In existing code, LeafQueue#updateClusterResource handled this. However
+    // here that method was not used.
+    queue.getUsersManager().userLimitNeedsRecompute();
     queue.assignContainers(clusterResource, node_0, new ResourceLimits(
         clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index 547571e..0aac2ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -659,7 +659,7 @@ public class TestApplicationLimitsByPartition {
     final ApplicationAttemptId appAttemptId_0_0 =
         TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
-        queue, queue.getActiveUsersManager(), spyRMContext);
+        queue, queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_0, user_0);
 
     List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -671,16 +671,16 @@ public class TestApplicationLimitsByPartition {
     queue.assignContainers(clusterResource, node_0,
         new ResourceLimits(clusterResource),
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-    //head room = queue capacity = 50 % 90% 160 GB
+    //head room = queue capacity = 50 % 90% 160 GB * 0.25 (UL)
     Resource expectedHeadroom =
-        Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1);
+        Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
 
     // Submit second application from user_0, check headroom
     final ApplicationAttemptId appAttemptId_0_1 =
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
-        queue, queue.getActiveUsersManager(), spyRMContext);
+        queue, queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_0_1, user_0);
 
     List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -703,15 +703,16 @@ public class TestApplicationLimitsByPartition {
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
     //head room for default label + head room for y partition
     //head room for y partition = 100% 50%(b queue capacity ) *  160 * GB
-    Resource expectedHeadroomWithReqInY =
-        Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom);
+    Resource expectedHeadroomWithReqInY = Resources.add(
+        Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
+        expectedHeadroom);
     assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
 
     // Submit first application from user_1, check for new headroom
     final ApplicationAttemptId appAttemptId_1_0 =
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
-        queue, queue.getActiveUsersManager(), spyRMContext);
+        queue, queue.getAbstractUsersManager(), spyRMContext);
     queue.submitApplicationAttempt(app_1_0, user_1);
 
     List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@@ -730,12 +731,12 @@ public class TestApplicationLimitsByPartition {
         SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
     //head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
     expectedHeadroom =
-        Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1);
+        Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
     //head room for default label + head room for y partition
     //head room for y partition = 100% 50%(b queue capacity ) *  160 * GB
-    expectedHeadroomWithReqInY =
-        Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1),
-            expectedHeadroom);
+    expectedHeadroomWithReqInY = Resources.add(
+        Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
+        expectedHeadroom);
     assertEquals(expectedHeadroom, app_0_0.getHeadroom());
     assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
     assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 8c850de..b4ebd15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -186,7 +186,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
       String userName, String partition, int memory) {
     CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
     LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
-    LeafQueue.User user = queue.getUser(userName);
+    UsersManager.User user = queue.getUser(userName);
     Assert.assertEquals(memory,
         user.getResourceUsage().getUsed(partition).getMemorySize());
   }
@@ -243,7 +243,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
     LeafQueue queue =
         (LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
             .getQueue("a");
-    ArrayList<UserInfo> users = queue.getUsers();
+    ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
     for (UserInfo userInfo : users) {
       if (userInfo.getUsername().equals("user")) {
         ResourceInfo resourcesUsed = userInfo.getResourcesUsed();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index f572ea3..7ad4f7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -77,10 +78,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -523,19 +522,22 @@ public class TestLeafQueue {
     // Users
     final String user_0 = "user_0";
 
+    // Active Users Manager
+    AbstractUsersManager activeUserManager = a.getAbstractUsersManager();
+
     // Submit applications
     final ApplicationAttemptId appAttemptId_0 = 
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext);
+            activeUserManager, spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            mock(ActiveUsersManager.class), spyRMContext);
+            activeUserManager, spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);  // same user
 
     
@@ -684,7 +686,7 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app0 =
         new FiCaSchedulerApp(appAttemptId0, user0, b,
-            b.getActiveUsersManager(), spyRMContext);
+            b.getAbstractUsersManager(), spyRMContext);
     b.submitApplicationAttempt(app0, user0);
 
     // Setup some nodes
@@ -748,14 +750,14 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app0 =
         new FiCaSchedulerApp(appAttemptId0, user0, b,
-            b.getActiveUsersManager(), spyRMContext);
+            b.getAbstractUsersManager(), spyRMContext);
     b.submitApplicationAttempt(app0, user0);
 
     final ApplicationAttemptId appAttemptId2 =
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app2 =
         new FiCaSchedulerApp(appAttemptId2, user1, b,
-            b.getActiveUsersManager(), spyRMContext);
+            b.getAbstractUsersManager(), spyRMContext);
     b.submitApplicationAttempt(app2, user1);
 
     // Setup some nodes
@@ -776,6 +778,7 @@ public class TestLeafQueue {
     Resource clusterResource =
         Resources.createResource(numNodes * (8 * GB), numNodes * 100);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
 
     // Setup resource-requests so that one application is memory dominant
     // and other application is vcores dominant
@@ -799,7 +802,7 @@ public class TestLeafQueue {
     User queueUser1 = b.getUser(user1);
 
     assertEquals("There should 2 active users!", 2, b
-        .getActiveUsersManager().getNumActiveUsers());
+        .getAbstractUsersManager().getNumActiveUsers());
     // Fill both Nodes as far as we can
     CSAssignment assign;
     do {
@@ -834,7 +837,7 @@ public class TestLeafQueue {
             / (numNodes * 100.0f)
             + queueUser1.getUsed().getMemorySize()
             / (numNodes * 8.0f * GB);
-    assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+    assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
     // Add another node and make sure consumedRatio is adjusted
     // accordingly.
     numNodes = 3;
@@ -848,7 +851,7 @@ public class TestLeafQueue {
             / (numNodes * 100.0f)
             + queueUser1.getUsed().getMemorySize()
             / (numNodes * 8.0f * GB);
-    assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+    assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
   }
 
   @Test
@@ -858,6 +861,9 @@ public class TestLeafQueue {
     //unset maxCapacity
     a.setMaxCapacity(1.0f);
 
+    when(csContext.getClusterResource())
+        .thenReturn(Resources.createResource(16 * GB, 32));
+
     // Users
     final String user_0 = "user_0";
     final String user_1 = "user_1";
@@ -867,14 +873,14 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_1, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_1, user_1); // different user
 
     // Setup some nodes
@@ -913,7 +919,7 @@ public class TestLeafQueue {
     a.setUserLimitFactor(2);
     
     // There're two active users
-    assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
     applyCSAssignment(clusterResource,
@@ -948,7 +954,7 @@ public class TestLeafQueue {
 
     // app_0 doesn't have outstanding resources, there's only one active user.
     assertEquals("There should only be 1 active user!", 
-        1, a.getActiveUsersManager().getNumActiveUsers());
+        1, a.getAbstractUsersManager().getNumActiveUsers());
 
   }
 
@@ -999,7 +1005,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
-            qb.getActiveUsersManager(), spyRMContext);
+            qb.getAbstractUsersManager(), spyRMContext);
     Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
     apps.put(app_0.getApplicationAttemptId(), app_0);
     qb.submitApplicationAttempt(app_0, user_0);
@@ -1010,7 +1016,7 @@ public class TestLeafQueue {
             u0Priority, recordFactory)));
 
     assertEquals("There should only be 1 active user!",
-        1, qb.getActiveUsersManager().getNumActiveUsers());
+        1, qb.getAbstractUsersManager().getNumActiveUsers());
     //get headroom
     applyCSAssignment(clusterResource,
         qb.assignContainers(clusterResource, node_0,
@@ -1027,7 +1033,7 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
-            qb.getActiveUsersManager(), spyRMContext);
+            qb.getAbstractUsersManager(), spyRMContext);
     apps.put(app_2.getApplicationAttemptId(), app_2);
     Priority u1Priority = TestUtils.createMockPriority(2);
     SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
@@ -1065,13 +1071,13 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
-            qb.getActiveUsersManager(), spyRMContext);
+            qb.getAbstractUsersManager(), spyRMContext);
     apps.put(app_1.getApplicationAttemptId(), app_1);
     final ApplicationAttemptId appAttemptId_3 =
         TestUtils.getMockApplicationAttemptId(3, 0);
     FiCaSchedulerApp app_3 =
         new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
-            qb.getActiveUsersManager(), spyRMContext);
+            qb.getAbstractUsersManager(), spyRMContext);
     apps.put(app_3.getApplicationAttemptId(), app_3);
     app_1.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
@@ -1100,7 +1106,7 @@ public class TestLeafQueue {
               TestUtils.getMockApplicationAttemptId(4, 0);
     FiCaSchedulerApp app_4 =
               new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
-                      qb.getActiveUsersManager(), spyRMContext);
+                      qb.getAbstractUsersManager(), spyRMContext);
     apps.put(app_4.getApplicationAttemptId(), app_4);
     qb.submitApplicationAttempt(app_4, user_0);
     app_4.updateResourceRequests(Collections.singletonList(
@@ -1123,9 +1129,9 @@ public class TestLeafQueue {
     //testcase3 still active - 2+2+6=10
     assertEquals(10*GB, qb.getUsedResources().getMemorySize());
     //app4 is user 0
-    //maxqueue 16G, userlimit 13G, used 8G, headroom 5G
+    //maxqueue 16G, userlimit 7G, used 8G, headroom 5G
     //(8G used is 6G from this test case - app4, 2 from last test case, app_1)
-    assertEquals(5*GB, app_4.getHeadroom().getMemorySize());
+    assertEquals(0*GB, app_4.getHeadroom().getMemorySize());
   }
 
   @Test
@@ -1144,21 +1150,21 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0);
     FiCaSchedulerApp app_0 =
         new FiCaSchedulerApp(appAttemptId_0, user_0, a,
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 =
         TestUtils.getMockApplicationAttemptId(1, 0);
     FiCaSchedulerApp app_1 =
         new FiCaSchedulerApp(appAttemptId_1, user_0, a,
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);  // same user
 
     final ApplicationAttemptId appAttemptId_2 =
         TestUtils.getMockApplicationAttemptId(2, 0);
     FiCaSchedulerApp app_2 =
         new FiCaSchedulerApp(appAttemptId_2, user_1, a,
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_2, user_1);
 
     // Setup some nodes
@@ -1244,21 +1250,21 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 =
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);  // same user
 
     final ApplicationAttemptId appAttemptId_2 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_2 = 
         new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_2, user_1);
 
     // Setup some nodes
@@ -1298,7 +1304,7 @@ public class TestLeafQueue {
     // Now, only user_0 should be active since he is the only one with
     // outstanding requests
     assertEquals("There should only be 1 active user!", 
-        1, a.getActiveUsersManager().getNumActiveUsers());
+        1, a.getAbstractUsersManager().getNumActiveUsers());
 
     // 1 container to user_0
     applyCSAssignment(clusterResource,
@@ -1309,8 +1315,8 @@ public class TestLeafQueue {
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
     // TODO, fix headroom in the future patch
-    assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
-      // User limit = 4G, 2 in use
+    assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
+      // User limit = 2G, 2 in use
     assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
       // the application is not yet active
 
@@ -1322,15 +1328,15 @@ public class TestLeafQueue {
     assertEquals(3*GB, a.getUsedResources().getMemorySize());
     assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
     assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
-    assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
-    assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
+    assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
+    assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
     
     // Submit requests for app_1 and set max-cap
     a.setMaxCapacity(.1f);
     app_2.updateResourceRequests(Collections.singletonList(
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
             priority, recordFactory)));
-    assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
 
     // No more to user_0 since he is already over user-limit
     // and no more containers to queue since it's already at max-cap
@@ -1349,7 +1355,7 @@ public class TestLeafQueue {
     app_1.updateResourceRequests(Collections.singletonList(     // unset
         TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
             priority, recordFactory)));
-    assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+    assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers());
     applyCSAssignment(clusterResource,
         a.assignContainers(clusterResource, node_1,
         new ResourceLimits(clusterResource),
@@ -1375,28 +1381,28 @@ public class TestLeafQueue {
         TestUtils.getMockApplicationAttemptId(0, 0); 
     FiCaSchedulerApp app_0 = 
         new FiCaSchedulerApp(appAttemptId_0, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_0, user_0);
 
     final ApplicationAttemptId appAttemptId_1 = 
         TestUtils.getMockApplicationAttemptId(1, 0); 
     FiCaSchedulerApp app_1 = 
         new FiCaSchedulerApp(appAttemptId_1, user_0, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_1, user_0);  // same user
 
     final ApplicationAttemptId appAttemptId_2 = 
         TestUtils.getMockApplicationAttemptId(2, 0); 
     FiCaSchedulerApp app_2 = 
         new FiCaSchedulerApp(appAttemptId_2, user_1, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_2, user_1);
 
     final ApplicationAttemptId appAttemptId_3 = 
         TestUtils.getMockApplicationAttemptId(3, 0); 
     FiCaSchedulerApp app_3 = 
         new FiCaSchedulerApp(appAttemptId_3, user_2, a, 
-            a.getActiveUsersManager(), spyRMContext);
+            a.getAbstractUsersManager(), spyRMContext);
     a.submitApplicationAttempt(app_3, user_2);
 
     Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
@@ -1414,7 +1420,8 @@ public class TestLeafQueue {
     Resource clusterResource = 
         Resources.createResource(numNodes * (8*GB), numNodes * 16);
     when(csContext.getNumClusterNodes()).thenReturn(numNodes);
-    
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
+
     // Setup resource-requests
     Priority priority = TestUtils.createMockPriority(1);
     app_0.updateResourceRequests(Collections.singletonList(
@@ -1741,6 +1748,7 @@ public class TestLeafQueue {
     
     when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
     when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+    when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1));
 
     Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
         app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
@@ -3804,7 +3812,7 @@ public class TestLeafQueue {
     final String user = "user1";
     FiCaSchedulerApp app =
         new FiCaSchedulerApp(appAttId, user, queue,
-            queue.getActiveUsersManager(), rmContext);
+            queue.getAbstractUsersManager(), rmContext);
 
     // Resource request
     Resource requestedResource = Resource.newInstance(1536, 2);
@@ -3819,7 +3827,7 @@ public class TestLeafQueue {
     // child of root, its absolute capaicty is also 50%.
     queue = createQueue("test2", null, 0.5f, 0.5f);
     app = new FiCaSchedulerApp(appAttId, user, queue,
-        queue.getActiveUsersManager(), rmContext);
+        queue.getAbstractUsersManager(), rmContext);
     app.getAppAttemptResourceUsage().incUsed(requestedResource);
     // In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
     assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@@ -3831,7 +3839,7 @@ public class TestLeafQueue {
     // Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
     AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
     app = new FiCaSchedulerApp(appAttId, user, qChild,
-        qChild.getActiveUsersManager(), rmContext);
+        qChild.getAbstractUsersManager(), rmContext);
     app.getAppAttemptResourceUsage().incUsed(requestedResource);
     // In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
     assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@@ -3855,7 +3863,7 @@ public class TestLeafQueue {
     ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
     AbstractCSQueue queue = mock(AbstractCSQueue.class);
     when(queue.getMetrics()).thenReturn(metrics);
-    when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+    when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
     when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
     QueueCapacities qCaps = mock(QueueCapacities.class);
     when(qCaps.getAbsoluteCapacity((String) any())).thenReturn(absCap);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 32a9074..740ef33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -1049,7 +1049,7 @@ public class TestNodeLabelContainerAllocation {
     RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
     MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
     
-    // Each application request 5 * 1GB container
+    // Each application request 50 * 1GB container
     am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
     
     // NM1 do 50 heartbeats
@@ -1169,12 +1169,14 @@ public class TestNodeLabelContainerAllocation {
     csConf.setAccessibleNodeLabels(A, toSet("x"));
     csConf.setCapacityByLabel(A, "x", 50);
     csConf.setMaximumCapacityByLabel(A, "x", 50);
+    csConf.setUserLimit(A, 200);
 
     final String B = CapacitySchedulerConfiguration.ROOT + ".b";
     csConf.setCapacity(B, 50);
     csConf.setAccessibleNodeLabels(B, toSet("x"));
     csConf.setCapacityByLabel(B, "x", 50);
     csConf.setMaximumCapacityByLabel(B, "x", 50);
+    csConf.setUserLimit(B, 200);
 
     // set node -> label
     mgr.addToCluserNodeLabels(ImmutableSet.of(
@@ -1207,6 +1209,7 @@ public class TestNodeLabelContainerAllocation {
 
     SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
 
+    cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     for (int i = 0; i < 50; i++) {
       cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     }
@@ -1250,7 +1253,7 @@ public class TestNodeLabelContainerAllocation {
       cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
     }
   }
-  
+
   private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
       throws InterruptedException {
     int totalWaitTick = 100; // wait 10 sec at most.


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org


[2/2] hadoop git commit: YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)

Posted by ep...@apache.org.
YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)

(cherry picked from commit 5fb723bb77722d41df6959eee23e1b0cfeb5584e)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2d440b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2d440b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2d440b3

Branch: refs/heads/branch-2
Commit: f2d440b3b3a094d1230c02cfa652f2f33df83b8d
Parents: e6cdf77
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Feb 9 10:23:50 2017 -0800
Committer: Eric Payne <ep...@apache.org>
Committed: Wed Jul 12 10:06:02 2017 -0500

----------------------------------------------------------------------
 .../FifoIntraQueuePreemptionPlugin.java         |   6 +-
 .../scheduler/AbstractUsersManager.java         |  54 +
 .../scheduler/ActiveUsersManager.java           |  23 +-
 .../scheduler/AppSchedulingInfo.java            |  16 +-
 .../server/resourcemanager/scheduler/Queue.java |   2 +-
 .../scheduler/SchedulerApplicationAttempt.java  |   4 +-
 .../scheduler/capacity/CSQueue.java             |   8 +-
 .../capacity/CapacityHeadroomProvider.java      |   4 +-
 .../scheduler/capacity/CapacityScheduler.java   |   2 +-
 .../scheduler/capacity/LeafQueue.java           | 587 +++--------
 .../scheduler/capacity/ParentQueue.java         |   2 +-
 .../scheduler/capacity/UsersManager.java        | 982 +++++++++++++++++++
 .../scheduler/common/fica/FiCaSchedulerApp.java |  14 +-
 .../scheduler/fair/FSLeafQueue.java             |   2 +-
 .../scheduler/fair/FSParentQueue.java           |   2 +-
 .../scheduler/fifo/FifoScheduler.java           |   2 +-
 .../dao/CapacitySchedulerLeafQueueInfo.java     |   2 +-
 ...alCapacityPreemptionPolicyMockFramework.java |   8 +-
 .../TestSchedulerApplicationAttempt.java        |  14 +-
 .../capacity/TestApplicationLimits.java         |  15 +-
 .../TestApplicationLimitsByPartition.java       |  23 +-
 .../TestCapacitySchedulerNodeLabelUpdate.java   |   4 +-
 .../scheduler/capacity/TestLeafQueue.java       |  98 +-
 .../TestNodeLabelContainerAllocation.java       |   7 +-
 24 files changed, 1303 insertions(+), 578 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 757f567..5f1af1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
 import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
 import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
@@ -264,8 +265,9 @@ public class FifoIntraQueuePreemptionPlugin
 
       // Verify whether we already calculated headroom for this user.
       if (userLimitResource == null) {
-        userLimitResource = Resources.clone(tq.leafQueue
-            .getUserLimitPerUser(userName, partitionBasedResource, partition));
+        userLimitResource = Resources.clone(
+            tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
+                partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
 
         Resource amUsed = perUserAMUsed.get(userName);
         if (null == amUsed) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
new file mode 100644
index 0000000..4db3584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * {@link AbstractUsersManager} tracks users in the system.
+ */
+@Private
+public interface AbstractUsersManager {
+  /**
+   * An application has new outstanding requests.
+   *
+   * @param user
+   *          application user
+   * @param applicationId
+   *          activated application
+   */
+  void activateApplication(String user, ApplicationId applicationId);
+  /**
+   * An application has no more outstanding requests.
+   *
+   * @param user
+   *          application user
+   * @param applicationId
+   *          deactivated application
+   */
+  void deactivateApplication(String user, ApplicationId applicationId);
+
+  /**
+   * Get number of active users i.e. users with applications which have pending
+   * resource requests.
+   *
+   * @return number of active users
+   */
+  int getNumActiveUsers();
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
index 36e6858..049f324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
@@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.utils.Lock;
  * An active user is defined as someone with outstanding resource requests.
  */
 @Private
-public class ActiveUsersManager {
-  
+public class ActiveUsersManager implements AbstractUsersManager {
+
   private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
   
   private final QueueMetrics metrics;
@@ -45,7 +45,7 @@ public class ActiveUsersManager {
   private int activeUsers = 0;
   private Map<String, Set<ApplicationId>> usersApplications = 
       new HashMap<String, Set<ApplicationId>>();
-  
+
   public ActiveUsersManager(QueueMetrics metrics) {
     this.metrics = metrics;
   }
@@ -57,6 +57,7 @@ public class ActiveUsersManager {
    * @param applicationId activated application
    */
   @Lock({Queue.class, SchedulerApplicationAttempt.class})
+  @Override
   synchronized public void activateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -65,8 +66,10 @@ public class ActiveUsersManager {
       usersApplications.put(user, userApps);
       ++activeUsers;
       metrics.incrActiveUsers();
-      LOG.debug("User " + user + " added to activeUsers, currently: " + 
-          activeUsers);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("User " + user + " added to activeUsers, currently: "
+            + activeUsers);
+      }
     }
     if (userApps.add(applicationId)) {
       metrics.activateApp(user);
@@ -80,6 +83,7 @@ public class ActiveUsersManager {
    * @param applicationId deactivated application
    */
   @Lock({Queue.class, SchedulerApplicationAttempt.class})
+  @Override
   synchronized public void deactivateApplication(
       String user, ApplicationId applicationId) {
     Set<ApplicationId> userApps = usersApplications.get(user);
@@ -91,18 +95,21 @@ public class ActiveUsersManager {
         usersApplications.remove(user);
         --activeUsers;
         metrics.decrActiveUsers();
-        LOG.debug("User " + user + " removed from activeUsers, currently: " + 
-            activeUsers);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("User " + user + " removed from activeUsers, currently: "
+              + activeUsers);
+        }
       }
     }
   }
-  
+
   /**
    * Get number of active users i.e. users with applications which have pending
    * resource requests.
    * @return number of active users
    */
   @Lock({Queue.class, SchedulerApplicationAttempt.class})
+  @Override
   synchronized public int getNumActiveUsers() {
     return activeUsers;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index f0b6e98..9532343 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -66,7 +66,7 @@ public class AppSchedulingInfo {
   private final String user;
 
   private Queue queue;
-  private ActiveUsersManager activeUsersManager;
+  private AbstractUsersManager abstractUsersManager;
   // whether accepted/allocated by scheduler
   private volatile boolean pending = true;
   private ResourceUsage appResourceUsage;
@@ -90,13 +90,13 @@ public class AppSchedulingInfo {
   public final ContainerUpdateContext updateContext;
 
   public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      String user, Queue queue, AbstractUsersManager abstractUsersManager,
       long epoch, ResourceUsage appResourceUsage) {
     this.applicationAttemptId = appAttemptId;
     this.applicationId = appAttemptId.getApplicationId();
     this.queue = queue;
     this.user = user;
-    this.activeUsersManager = activeUsersManager;
+    this.abstractUsersManager = abstractUsersManager;
     this.containerIdCounter = new AtomicLong(
         epoch << ResourceManager.EPOCH_BIT_SHIFT);
     this.appResourceUsage = appResourceUsage;
@@ -253,7 +253,7 @@ public class AppSchedulingInfo {
       // Activate application. Metrics activation is done here.
       if (lastRequestContainers <= 0) {
         schedulerKeys.add(schedulerKey);
-        activeUsersManager.activateApplication(user, applicationId);
+        abstractUsersManager.activateApplication(user, applicationId);
       }
     }
 
@@ -453,7 +453,7 @@ public class AppSchedulingInfo {
 
   public void checkForDeactivation() {
     if (schedulerKeys.isEmpty()) {
-      activeUsersManager.deactivateApplication(user, applicationId);
+      abstractUsersManager.deactivateApplication(user, applicationId);
     }
   }
   
@@ -483,9 +483,9 @@ public class AppSchedulingInfo {
       }
       oldMetrics.moveAppFrom(this);
       newMetrics.moveAppTo(this);
-      activeUsersManager.deactivateApplication(user, applicationId);
-      activeUsersManager = newQueue.getActiveUsersManager();
-      activeUsersManager.activateApplication(user, applicationId);
+      abstractUsersManager.deactivateApplication(user, applicationId);
+      abstractUsersManager = newQueue.getAbstractUsersManager();
+      abstractUsersManager.activateApplication(user, applicationId);
       this.queue = newQueue;
     } finally {
       this.writeLock.unlock();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index ada2a0b..d166e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -63,7 +63,7 @@ public interface Queue {
 
   boolean hasAccess(QueueACL acl, UserGroupInformation user);
   
-  public ActiveUsersManager getActiveUsersManager();
+  public AbstractUsersManager getAbstractUsersManager();
 
   /**
    * Recover the state of the queue for a given container.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index ecfea18..e1d714d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -200,13 +200,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
   private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
 
   public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId, 
-      String user, Queue queue, ActiveUsersManager activeUsersManager,
+      String user, Queue queue, AbstractUsersManager abstractUsersManager,
       RMContext rmContext) {
     Preconditions.checkNotNull(rmContext, "RMContext should not be null");
     this.rmContext = rmContext;
     this.appSchedulingInfo = 
         new AppSchedulingInfo(applicationAttemptId, user, queue,  
-            activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
+            abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
     this.queue = queue;
     this.pendingRelease = Collections.newSetFromMap(
         new ConcurrentHashMap<ContainerId, Boolean>());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index b878e72..c6726ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
 import org.apache.hadoop.yarn.security.PrivilegedEntity;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -240,10 +240,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
       ResourceLimits resourceLimits);
   
   /**
-   * Get the {@link ActiveUsersManager} for the queue.
-   * @return the <code>ActiveUsersManager</code> for the queue
+   * Get the {@link AbstractUsersManager} for the queue.
+   * @return the <code>AbstractUsersManager</code> for the queue
    */
-  public ActiveUsersManager getActiveUsersManager();
+  public AbstractUsersManager getAbstractUsersManager();
   
   /**
    * Adds all applications in the queue and its subqueues to the given collection.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index 5605f18..140a2ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
 
 public class CapacityHeadroomProvider {
   
-  LeafQueue.User user;
+  UsersManager.User user;
   LeafQueue queue;
   FiCaSchedulerApp application;
   LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
   
-  public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
+  public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
       FiCaSchedulerApp application,
       LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 8b1f8b4..54145dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -745,7 +745,7 @@ public class CapacityScheduler extends
       CSQueue queue = (CSQueue) application.getQueue();
 
       FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
-          application.getUser(), queue, queue.getActiveUsersManager(),
+          application.getUser(), queue, queue.getAbstractUsersManager(),
           rmContext, application.getPriority(), isAttemptRecovering,
           activitiesManager);
       if (transferStateFromPreviousAttempt) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2b1efd6..c34adbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
 import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -76,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPo
 import org.apache.hadoop.yarn.server.utils.Lock;
 import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
 import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.Resources;
 
 import java.io.IOException;
@@ -101,8 +101,6 @@ public class LeafQueue extends AbstractCSQueue {
   private static final Log LOG = LogFactory.getLog(LeafQueue.class);
 
   private float absoluteUsedCapacity = 0.0f;
-  private volatile int userLimit;
-  private volatile float userLimitFactor;
 
   protected int maxApplications;
   protected volatile int maxApplicationsPerUser;
@@ -122,14 +120,12 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile float minimumAllocationFactor;
 
-  private Map<String, User> users = new ConcurrentHashMap<>();
-
   private final RecordFactory recordFactory = 
     RecordFactoryProvider.getRecordFactory(null);
 
   private CapacitySchedulerContext scheduler;
   
-  private final ActiveUsersManager activeUsersManager;
+  private final UsersManager usersManager;
 
   // cache last cluster resource to compute actual capacity
   private Resource lastClusterResource = Resources.none();
@@ -141,10 +137,6 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
 
-  // Summation of consumed ratios for all users in queue
-  private float totalUserConsumedRatio = 0;
-  private UsageRatios qUsageRatios;
-
   // record all ignore partition exclusivityRMContainer, this will be used to do
   // preemption, key is the partition of the RMContainer allocated on
   private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@@ -159,13 +151,12 @@ public class LeafQueue extends AbstractCSQueue {
     super(cs, queueName, parent, old);
     this.scheduler = cs;
 
-    this.activeUsersManager = new ActiveUsersManager(metrics);
+    this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
+        resourceCalculator);
 
     // One time initialization is enough since it is static ordering policy
     this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
 
-    qUsageRatios = new UsageRatios();
-
     if(LOG.isDebugEnabled()) {
       LOG.debug("LeafQueue:" + " name=" + queueName
         + ", fullname=" + getQueuePath());
@@ -197,8 +188,8 @@ public class LeafQueue extends AbstractCSQueue {
       setOrderingPolicy(
           conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
 
-      userLimit = conf.getUserLimit(getQueuePath());
-      userLimitFactor = conf.getUserLimitFactor(getQueuePath());
+      usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
+      usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
 
       maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
       if (maxApplications < 0) {
@@ -212,7 +203,8 @@ public class LeafQueue extends AbstractCSQueue {
         }
       }
       maxApplicationsPerUser = Math.min(maxApplications,
-          (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));
+          (int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
+              * usersManager.getUserLimitFactor()));
 
       maxAMResourcePerQueuePercent =
           conf.getMaximumApplicationMasterResourcePerQueuePercent(
@@ -271,8 +263,9 @@ public class LeafQueue extends AbstractCSQueue {
               + queueCapacities.getAbsoluteMaximumCapacity()
               + " [= 1.0 maximumCapacity undefined, "
               + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
-              + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]"
-              + "\n" + "userLimitFactor = " + userLimitFactor
+              + "\n" + "userLimit = " + usersManager.getUserLimit()
+              + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+              + usersManager.getUserLimitFactor()
               + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
               + maxApplications
               + " [= configuredMaximumSystemApplicationsPerQueue or"
@@ -336,9 +329,17 @@ public class LeafQueue extends AbstractCSQueue {
     return maxApplicationsPerUser;
   }
 
+  /**
+   *
+   * @return UsersManager instance.
+   */
+  public UsersManager getUsersManager() {
+    return usersManager;
+  }
+
   @Override
-  public ActiveUsersManager getActiveUsersManager() {
-    return activeUsersManager;
+  public AbstractUsersManager getAbstractUsersManager() {
+    return usersManager;
   }
 
   @Override
@@ -352,7 +353,8 @@ public class LeafQueue extends AbstractCSQueue {
    */
   @VisibleForTesting
   void setUserLimit(int userLimit) {
-    this.userLimit = userLimit;
+    usersManager.setUserLimit(userLimit);
+    usersManager.userLimitNeedsRecompute();
   }
 
   /**
@@ -361,7 +363,8 @@ public class LeafQueue extends AbstractCSQueue {
    */
   @VisibleForTesting
   void setUserLimitFactor(float userLimitFactor) {
-    this.userLimitFactor = userLimitFactor;
+    usersManager.setUserLimitFactor(userLimitFactor);
+    usersManager.userLimitNeedsRecompute();
   }
 
   @Override
@@ -422,12 +425,12 @@ public class LeafQueue extends AbstractCSQueue {
 
   @Private
   public int getUserLimit() {
-    return userLimit;
+    return usersManager.getUserLimit();
   }
 
   @Private
   public float getUserLimitFactor() {
-    return userLimitFactor;
+    return usersManager.getUserLimitFactor();
   }
 
   @Override
@@ -477,44 +480,7 @@ public class LeafQueue extends AbstractCSQueue {
 
   @VisibleForTesting
   public User getUser(String userName) {
-    return users.get(userName);
-  }
-
-  // Get and add user if absent
-  private User getUserAndAddIfAbsent(String userName) {
-    try {
-      writeLock.lock();
-      User u = users.get(userName);
-      if (null == u) {
-        u = new User();
-        users.put(userName, u);
-      }
-      return u;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  /**
-   * @return an ArrayList of UserInfo objects who are active in this queue
-   */
-  public ArrayList<UserInfo> getUsers() {
-    try {
-      readLock.lock();
-      ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
-      for (Map.Entry<String, User> entry : users.entrySet()) {
-        User user = entry.getValue();
-        usersToReturn.add(
-            new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
-                user.getActiveApplications(), user.getPendingApplications(),
-                Resources.clone(user.getConsumedAMResources()),
-                Resources.clone(user.getUserResourceLimit()),
-                user.getResourceUsage()));
-      }
-      return usersToReturn;
-    } finally {
-      readLock.unlock();
-    }
+    return usersManager.getUser(userName);
   }
 
   @Private
@@ -575,7 +541,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       // TODO, should use getUser, use this method just to avoid UT failure
       // which is caused by wrong invoking order, will fix UT separately
-      User user = getUserAndAddIfAbsent(userName);
+      User user = usersManager.getUserAndAddIfAbsent(userName);
 
       // Add the attempt to our data-structures
       addApplicationAttempt(application, user);
@@ -632,7 +598,7 @@ public class LeafQueue extends AbstractCSQueue {
       }
 
       // Check submission limits for the user on this queue
-      User user = getUserAndAddIfAbsent(userName);
+      User user = usersManager.getUserAndAddIfAbsent(userName);
       if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
         String msg = "Queue " + getQueuePath() + " already has " + user
             .getTotalApplications() + " applications from user " + userName
@@ -682,19 +648,21 @@ public class LeafQueue extends AbstractCSQueue {
        * the absolute queue capacity (per partition) instead of the max and is
        * modified by the userlimit and the userlimit factor as is the userlimit
        */
-      float effectiveUserLimit = Math.max(userLimit / 100.0f,
-          1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+      float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
+          1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
 
-      Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
-          resourceCalculator,
-          labelManager.getResourceByLabel(nodePartition, lastClusterResource),
-          queueCapacities.getAbsoluteCapacity(nodePartition),
-          minimumAllocation);
+      Resource queuePartitionResource = Resources
+          .multiplyAndNormalizeUp(resourceCalculator,
+              labelManager.getResourceByLabel(nodePartition,
+                  lastClusterResource),
+              queueCapacities.getAbsoluteCapacity(nodePartition),
+              minimumAllocation);
 
       Resource userAMLimit = Resources.multiplyAndNormalizeUp(
           resourceCalculator, queuePartitionResource,
           queueCapacities.getMaxAMResourcePercentage(nodePartition)
-              * effectiveUserLimit * userLimitFactor, minimumAllocation);
+              * effectiveUserLimit * usersManager.getUserLimitFactor(),
+          minimumAllocation);
       return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
           userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
           userAMLimit :
@@ -910,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void finishApplication(ApplicationId application, String user) {
     // Inform the activeUsersManager
-    activeUsersManager.deactivateApplication(user, application);
+    usersManager.deactivateApplication(user, application);
 
     appFinished();
 
@@ -932,7 +900,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       // TODO, should use getUser, use this method just to avoid UT failure
       // which is caused by wrong invoking order, will fix UT separately
-      User user = getUserAndAddIfAbsent(userName);
+      User user = usersManager.getUserAndAddIfAbsent(userName);
 
       String partitionName = application.getAppAMNodePartitionName();
       boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
@@ -950,7 +918,7 @@ public class LeafQueue extends AbstractCSQueue {
 
       user.finishApplication(wasActive);
       if (user.getTotalApplications() == 0) {
-        users.remove(application.getUser());
+        usersManager.removeUser(application.getUser());
       }
 
       // Check if we can activate more applications
@@ -1291,7 +1259,7 @@ public class LeafQueue extends AbstractCSQueue {
       Resource clusterResource, FiCaSchedulerApp application,
       String partition) {
     return getHeadroom(user, queueCurrentLimit, clusterResource,
-        computeUserLimit(application.getUser(), clusterResource, user,
+        getResourceLimitForActiveUsers(application.getUser(), clusterResource,
             partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
         partition);
   }
@@ -1365,7 +1333,7 @@ public class LeafQueue extends AbstractCSQueue {
     // Compute user limit respect requested labels,
     // TODO, need consider headroom respect labels also
     Resource userLimit =
-        computeUserLimit(application.getUser(), clusterResource, queueUser,
+        getResourceLimitForActiveUsers(application.getUser(), clusterResource,
             nodePartition, schedulingMode);
 
     setQueueResourceLimitsInfo(clusterResource);
@@ -1375,11 +1343,11 @@ public class LeafQueue extends AbstractCSQueue {
             clusterResource, userLimit, nodePartition);
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Headroom calculation for user " + user + ": " + 
-          " userLimit=" + userLimit + 
-          " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
-          " consumed=" + queueUser.getUsed() + 
-          " headroom=" + headroom);
+      LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
+          + userLimit + " queueMaxAvailRes="
+          + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+          + queueUser.getUsed() + " headroom=" + headroom + " partition="
+          + nodePartition);
     }
     
     CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
@@ -1407,129 +1375,46 @@ public class LeafQueue extends AbstractCSQueue {
     return rackLocalityFullReset;
   }
 
-  @Lock(NoLock.class)
-  private Resource computeUserLimit(String userName,
-      Resource clusterResource, User user,
-      String nodePartition, SchedulingMode schedulingMode) {
-    Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
-        clusterResource);
-
-    // What is our current capacity? 
-    // * It is equal to the max(required, queue-capacity) if
-    //   we're running below capacity. The 'max' ensures that jobs in queues
-    //   with miniscule capacity (< 1 slot) make progress
-    // * If we're running over capacity, then its
-    //   (usedResources + required) (which extra resources we are allocating)
-    Resource queueCapacity =
-        Resources.multiplyAndNormalizeUp(resourceCalculator,
-            partitionResource,
-            queueCapacities.getAbsoluteCapacity(nodePartition),
-            minimumAllocation);
-
-    // Assume we have required resource equals to minimumAllocation, this can
-    // make sure user limit can continuously increase till queueMaxResource
-    // reached.
-    Resource required = minimumAllocation;
-
-    // Allow progress for queues with miniscule capacity
-    queueCapacity =
-        Resources.max(
-            resourceCalculator, partitionResource,
-            queueCapacity, 
-            required);
-
-
-    /* We want to base the userLimit calculation on
-     * max(queueCapacity, usedResources+required). However, we want
-     * usedResources to be based on the combined ratios of all the users in the
-     * queue so we use consumedRatio to calculate such.
-     * The calculation is dependent on how the resourceCalculator calculates the
-     * ratio between two Resources. DRF Example: If usedResources is
-     * greater than queueCapacity and users have the following [mem,cpu] usages:
-     * User1: [10%,20%] - Dominant resource is 20%
-     * User2: [30%,10%] - Dominant resource is 30%
-     * Then total consumedRatio is then 20+30=50%. Yes, this value can be
-     * larger than 100% but for the purposes of making sure all users are
-     * getting their fair share, it works.
-     */
-    Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
-        partitionResource, qUsageRatios.getUsageRatio(nodePartition),
-        minimumAllocation);
-    Resource currentCapacity =
-        Resources.lessThan(resourceCalculator, partitionResource, consumed,
-            queueCapacity) ? queueCapacity : Resources.add(consumed, required);
-    // Never allow a single user to take more than the 
-    // queue's configured capacity * user-limit-factor.
-    // Also, the queue's configured capacity should be higher than 
-    // queue-hard-limit * ulMin
-    
-    final int activeUsers = activeUsersManager.getNumActiveUsers();
-    
-    // User limit resource is determined by:
-    // max{currentCapacity / #activeUsers, currentCapacity *
-    // user-limit-percentage%)
-    Resource userLimitResource = Resources.max(
-        resourceCalculator, partitionResource,
-        Resources.divideAndCeil(
-            resourceCalculator, currentCapacity, activeUsers),
-        Resources.divideAndCeil(
-            resourceCalculator, 
-            Resources.multiplyAndRoundDown(
-                currentCapacity, userLimit), 
-            100)
-        );
-    
-    // User limit is capped by maxUserLimit
-    // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
-    // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
-    //
-    // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
-    // partition, its guaranteed resource on that partition is 0. And
-    // user-limit-factor computation is based on queue's guaranteed capacity. So
-    // we will not cap user-limit as well as used resource when doing
-    // IGNORE_PARTITION_EXCLUSIVITY allocation.
-    Resource maxUserLimit = Resources.none();
-    if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
-      maxUserLimit =
-          Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
-    } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
-      maxUserLimit = partitionResource;
-    }
-    
-    // Cap final user limit with maxUserLimit
-    userLimitResource =
-        Resources.roundUp(
-            resourceCalculator, 
-            Resources.min(
-                resourceCalculator, partitionResource,
-                  userLimitResource,
-                  maxUserLimit
-                ), 
-            minimumAllocation);
+  /**
+   *
+   * @param userName
+   *          Name of user who has submitted one/more app to given queue.
+   * @param clusterResource
+   *          total cluster resource
+   * @param nodePartition
+   *          partition name
+   * @param schedulingMode
+   *          scheduling mode
+   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+   * @return Computed User Limit
+   */
+  public Resource getResourceLimitForActiveUsers(String userName,
+      Resource clusterResource, String nodePartition,
+      SchedulingMode schedulingMode) {
+    return usersManager.getComputedResourceLimitForActiveUsers(userName,
+        clusterResource, nodePartition, schedulingMode);
+  }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("User limit computation for " + userName +
-          " in queue " + getQueueName() +
-          " userLimitPercent=" + userLimit +
-          " userLimitFactor=" + userLimitFactor +
-          " required: " + required +
-          " consumed: " + consumed +
-          " user-limit-resource: " + userLimitResource +
-          " queueCapacity: " + queueCapacity +
-          " qconsumed: " + queueUsage.getUsed() +
-          " consumedRatio: " + totalUserConsumedRatio +
-          " currentCapacity: " + currentCapacity +
-          " activeUsers: " + activeUsers +
-          " clusterCapacity: " + clusterResource +
-          " resourceByLabel: " + partitionResource +
-          " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
-          " Partition: " + nodePartition
-      );
-    }
-    user.setUserResourceLimit(userLimitResource);
-    return userLimitResource;
+  /**
+   *
+   * @param userName
+   *          Name of user who has submitted one/more app to given queue.
+   * @param clusterResource
+   *          total cluster resource
+   * @param nodePartition
+   *          partition name
+   * @param schedulingMode
+   *          scheduling mode
+   *          RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+   * @return Computed User Limit
+   */
+  public Resource getResourceLimitForAllUsers(String userName,
+      Resource clusterResource, String nodePartition,
+      SchedulingMode schedulingMode) {
+    return usersManager.getComputedResourceLimitForAllUsers(userName,
+        clusterResource, nodePartition, schedulingMode);
   }
-  
+
   @Private
   protected boolean canAssignToUser(Resource clusterResource,
       String userName, Resource limit, FiCaSchedulerApp application,
@@ -1600,52 +1485,34 @@ public class LeafQueue extends AbstractCSQueue {
     }
   }
 
-  private float calculateUserUsageRatio(Resource clusterResource,
-      String nodePartition) {
-    try {
-      writeLock.lock();
-      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
-          clusterResource);
-      float consumed = 0;
-      User user;
-      for (Map.Entry<String, User> entry : users.entrySet()) {
-        user = entry.getValue();
-        consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
-            resourceByLabel, nodePartition);
-      }
-      return consumed;
-    } finally {
-      writeLock.unlock();
-    }
-  }
-
-  private void recalculateQueueUsageRatio(Resource clusterResource,
+  /**
+   * Recalculate QueueUsage Ratio.
+   *
+   * @param clusterResource
+   *          Total Cluster Resource
+   * @param nodePartition
+   *          Partition
+   */
+  public void recalculateQueueUsageRatio(Resource clusterResource,
       String nodePartition) {
     try {
       writeLock.lock();
-      ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
+      ResourceUsage queueResourceUsage = getQueueResourceUsage();
 
       if (nodePartition == null) {
         for (String partition : Sets.union(
-            queueCapacities.getNodePartitionsSet(),
+            getQueueCapacities().getNodePartitionsSet(),
             queueResourceUsage.getNodePartitionsSet())) {
-          qUsageRatios.setUsageRatio(partition,
-              calculateUserUsageRatio(clusterResource, partition));
+          usersManager.updateUsageRatio(partition, clusterResource);
         }
-      } else{
-        qUsageRatios.setUsageRatio(nodePartition,
-            calculateUserUsageRatio(clusterResource, nodePartition));
+      } else {
+        usersManager.updateUsageRatio(nodePartition, clusterResource);
       }
     } finally {
       writeLock.unlock();
     }
   }
 
-  private void updateQueueUsageRatio(String nodePartition,
-      float delta) {
-    qUsageRatios.incUsageRatio(nodePartition, delta);
-  }
-
   @Override
   public void completedContainer(Resource clusterResource, 
       FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer, 
@@ -1708,8 +1575,6 @@ public class LeafQueue extends AbstractCSQueue {
     try {
       writeLock.lock();
       super.allocateResource(clusterResource, resource, nodePartition);
-      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
-          clusterResource);
 
       // handle ignore exclusivity container
       if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1728,16 +1593,9 @@ public class LeafQueue extends AbstractCSQueue {
       // Update user metrics
       String userName = application.getUser();
 
-      // TODO, should use getUser, use this method just to avoid UT failure
-      // which is caused by wrong invoking order, will fix UT separately
-      User user = getUserAndAddIfAbsent(userName);
-
-      user.assignContainer(resource, nodePartition);
-
-      // Update usage ratios
-      updateQueueUsageRatio(nodePartition,
-          user.updateUsageRatio(resourceCalculator, resourceByLabel,
-              nodePartition));
+      // Increment user's resource usage.
+      User user = usersManager.updateUserResourceUsage(userName, resource,
+          nodePartition, true);
 
       // Note this is a bit unconventional since it gets the object and modifies
       // it here, rather then using set routine
@@ -1746,9 +1604,10 @@ public class LeafQueue extends AbstractCSQueue {
           userName, application.getHeadroom());
 
       if (LOG.isDebugEnabled()) {
-        LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage
-            .getUsed() + " numContainers=" + numContainers + " headroom = "
-            + application.getHeadroom() + " user-resources=" + user.getUsed());
+        LOG.debug(getQueueName() + " user=" + userName + " used="
+            + queueUsage.getUsed(nodePartition) + " numContainers="
+            + numContainers + " headroom = " + application.getHeadroom()
+            + " user-resources=" + user.getUsed());
       }
     } finally {
       writeLock.unlock();
@@ -1761,8 +1620,6 @@ public class LeafQueue extends AbstractCSQueue {
     try {
       writeLock.lock();
       super.releaseResource(clusterResource, resource, nodePartition);
-      Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
-          clusterResource);
 
       // handle ignore exclusivity container
       if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1780,13 +1637,8 @@ public class LeafQueue extends AbstractCSQueue {
 
       // Update user metrics
       String userName = application.getUser();
-      User user = getUserAndAddIfAbsent(userName);
-      user.releaseContainer(resource, nodePartition);
-
-      // Update usage ratios
-      updateQueueUsageRatio(nodePartition,
-          user.updateUsageRatio(resourceCalculator, resourceByLabel,
-              nodePartition));
+      User user = usersManager.updateUserResourceUsage(userName, resource,
+          nodePartition, false);
 
       metrics.setAvailableResourcesToUser(nodePartition,
           userName, application.getHeadroom());
@@ -1846,6 +1698,10 @@ public class LeafQueue extends AbstractCSQueue {
       // activate the pending applications if possible
       activateApplications();
 
+      // In case of any resource change, invalidate recalculateULCount to clear
+      // the computed user-limit.
+      usersManager.userLimitNeedsRecompute();
+
       // Update application properties
       for (FiCaSchedulerApp application : orderingPolicy
           .getSchedulableEntities()) {
@@ -1861,16 +1717,16 @@ public class LeafQueue extends AbstractCSQueue {
   @Override
   public void incUsedResource(String nodeLabel, Resource resourceToInc,
       SchedulerApplicationAttempt application) {
-    getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
-        resourceToInc);
+    usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
+        nodeLabel, true);
     super.incUsedResource(nodeLabel, resourceToInc, application);
   }
 
   @Override
   public void decUsedResource(String nodeLabel, Resource resourceToDec,
       SchedulerApplicationAttempt application) {
-    getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
-        resourceToDec);
+    usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
+        nodeLabel, false);
     super.decUsedResource(nodeLabel, resourceToDec, application);
   }
 
@@ -1890,191 +1746,6 @@ public class LeafQueue extends AbstractCSQueue {
     queueUsage.decAMUsed(nodeLabel, resourceToDec);
   }
 
-  /*
-   * Usage Ratio
-   */
-  static private class UsageRatios {
-    private Map<String, Float> usageRatios;
-    private ReadLock readLock;
-    private WriteLock writeLock;
-
-    public UsageRatios() {
-      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-      readLock = lock.readLock();
-      writeLock = lock.writeLock();
-      usageRatios = new HashMap<String, Float>();
-    }
-
-    private void incUsageRatio(String label, float delta) {
-      try {
-        writeLock.lock();
-        Float fl = usageRatios.get(label);
-        if (null == fl) {
-          fl = new Float(0.0);
-        }
-        fl += delta;
-        usageRatios.put(label, new Float(fl));
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    float getUsageRatio(String label) {
-      try {
-        readLock.lock();
-        Float f = usageRatios.get(label);
-        if (null == f) {
-          return 0.0f;
-        }
-        return f;
-      } finally {
-        readLock.unlock();
-      }
-    }
-
-    private void setUsageRatio(String label, float ratio) {
-      try {
-        writeLock.lock();
-        usageRatios.put(label, new Float(ratio));
-      } finally {
-        writeLock.unlock();
-      }
-    }
-  }
-
-  @VisibleForTesting
-  public float getUsageRatio(String label) {
-    return qUsageRatios.getUsageRatio(label);
-  }
-
-  @VisibleForTesting
-  public static class User {
-    ResourceUsage userResourceUsage = new ResourceUsage();
-    volatile Resource userResourceLimit = Resource.newInstance(0, 0);
-    volatile int pendingApplications = 0;
-    volatile int activeApplications = 0;
-    private UsageRatios userUsageRatios = new UsageRatios();
-    private WriteLock writeLock;
-
-    User() {
-      ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
-      // Nobody uses read-lock now, will add it when necessary
-      writeLock = lock.writeLock();
-    }
-
-    public ResourceUsage getResourceUsage() {
-      return userResourceUsage;
-    }
-    
-    public float resetAndUpdateUsageRatio(
-        ResourceCalculator resourceCalculator,
-        Resource resource, String nodePartition) {
-      try {
-        writeLock.lock();
-        userUsageRatios.setUsageRatio(nodePartition, 0);
-        return updateUsageRatio(resourceCalculator, resource, nodePartition);
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    public float updateUsageRatio(
-        ResourceCalculator resourceCalculator,
-        Resource resource, String nodePartition) {
-      try {
-        writeLock.lock();
-        float delta;
-        float newRatio = Resources.ratio(resourceCalculator,
-            getUsed(nodePartition), resource);
-        delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
-        userUsageRatios.setUsageRatio(nodePartition, newRatio);
-        return delta;
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    public Resource getUsed() {
-      return userResourceUsage.getUsed();
-    }
-
-    public Resource getAllUsed() {
-      return userResourceUsage.getAllUsed();
-    }
-
-    public Resource getUsed(String label) {
-      return userResourceUsage.getUsed(label);
-    }
-
-    public int getPendingApplications() {
-      return pendingApplications;
-    }
-
-    public int getActiveApplications() {
-      return activeApplications;
-    }
-    
-    public Resource getConsumedAMResources() {
-      return userResourceUsage.getAMUsed();
-    }
-
-    public Resource getConsumedAMResources(String label) {
-      return userResourceUsage.getAMUsed(label);
-    }
-
-    public int getTotalApplications() {
-      return getPendingApplications() + getActiveApplications();
-    }
-    
-    public void submitApplication() {
-      try {
-        writeLock.lock();
-        ++pendingApplications;
-      } finally {
-        writeLock.unlock();
-      }
-    }
-    
-    public void activateApplication() {
-      try {
-        writeLock.lock();
-        --pendingApplications;
-        ++activeApplications;
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    public void finishApplication(boolean wasActive) {
-      try {
-        writeLock.lock();
-        if (wasActive) {
-          --activeApplications;
-        } else{
-          --pendingApplications;
-        }
-      } finally {
-        writeLock.unlock();
-      }
-    }
-
-    public void assignContainer(Resource resource, String nodePartition) {
-      userResourceUsage.incUsed(nodePartition, resource);
-    }
-
-    public void releaseContainer(Resource resource, String nodePartition) {
-      userResourceUsage.decUsed(nodePartition, resource);
-    }
-
-    public Resource getUserResourceLimit() {
-      return userResourceLimit;
-    }
-
-    public void setUserResourceLimit(Resource userResourceLimit) {
-      this.userResourceLimit = userResourceLimit;
-    }
-  }
-
   @Override
   public void recoverContainer(Resource clusterResource,
       SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
@@ -2144,9 +1815,9 @@ public class LeafQueue extends AbstractCSQueue {
    *                                  excessive preemption.
    * @return Total pending resource considering user limit
    */
-
   public Resource getTotalPendingResourcesConsideringUserLimit(
-      Resource clusterResources, String partition, boolean deductReservedFromPending) {
+      Resource clusterResources, String partition,
+      boolean deductReservedFromPending) {
     try {
       readLock.lock();
       Map<String, Resource> userNameToHeadroom =
@@ -2157,8 +1828,8 @@ public class LeafQueue extends AbstractCSQueue {
         if (!userNameToHeadroom.containsKey(userName)) {
           User user = getUser(userName);
           Resource headroom = Resources.subtract(
-              computeUserLimit(app.getUser(), clusterResources, user, partition,
-                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+              getResourceLimitForActiveUsers(app.getUser(), clusterResources,
+                  partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
               user.getUsed(partition));
           // Make sure headroom is not negative.
           headroom = Resources.componentwiseMax(headroom, Resources.none());
@@ -2188,16 +1859,6 @@ public class LeafQueue extends AbstractCSQueue {
 
   }
 
-  public synchronized Resource getUserLimitPerUser(String userName,
-      Resource resources, String partition) {
-
-    // Check user resource limit
-    User user = getUser(userName);
-
-    return computeUserLimit(userName, resources, user, partition,
-        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
-  }
-
   @Override
   public void collectSchedulerApplications(
       Collection<ApplicationAttemptId> apps) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 0206211..a9ccefc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -884,7 +884,7 @@ public class ParentQueue extends AbstractCSQueue {
   }
   
   @Override
-  public ActiveUsersManager getActiveUsersManager() {
+  public ActiveUsersManager getAbstractUsersManager() {
     // Should never be called since all applications are submitted to LeafQueues
     return null;
   }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org