You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2017/07/12 15:07:16 UTC
[1/2] hadoop git commit: YARN-5889. Improve and refactor user-limit
calculation in Capacity Scheduler. (Sunil G via wangda)
Repository: hadoop
Updated Branches:
refs/heads/branch-2 e6cdf770c -> f2d440b3b
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
new file mode 100644
index 0000000..05503c6
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -0,0 +1,982 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+/**
+ * {@link UsersManager} tracks users in the system and its respective data
+ * structures.
+ */
+@Private
+public class UsersManager implements AbstractUsersManager {
+
+ private static final Log LOG = LogFactory.getLog(UsersManager.class);
+
+ /*
+ * Member declaration for UsersManager class.
+ */
+ private final LeafQueue lQueue;
+ private final RMNodeLabelsManager labelManager;
+ private final ResourceCalculator resourceCalculator;
+ private final CapacitySchedulerContext scheduler;
+ private Map<String, User> users = new ConcurrentHashMap<>();
+
+ private ResourceUsage totalResUsageForActiveUsers = new ResourceUsage();
+ private ResourceUsage totalResUsageForNonActiveUsers = new ResourceUsage();
+ private Set<String> activeUsersSet = new HashSet<String>();
+ private Set<String> nonActiveUsersSet = new HashSet<String>();
+
+ // Summation of consumed ratios for all users in queue
+ private UsageRatios qUsageRatios;
+
+ // To detect whether there is a change in user count for every user-limit
+ // calculation.
+ private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
+ private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
+ new HashMap<String, Map<SchedulingMode, Long>>();
+ private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
+ new HashMap<String, Map<SchedulingMode, Long>>();
+
+ private volatile int userLimit;
+ private volatile float userLimitFactor;
+
+ private WriteLock writeLock;
+ private ReadLock readLock;
+
+ private final QueueMetrics metrics;
+ private AtomicInteger activeUsers = new AtomicInteger(0);
+ private Map<String, Set<ApplicationId>> usersApplications =
+ new HashMap<String, Set<ApplicationId>>();
+
+ // Pre-computed list of user-limits.
+ Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
+ Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
+
+ /**
+ * UsageRatios will store the total used resources ratio across all users of
+ * the queue.
+ */
+ static private class UsageRatios {
+ private Map<String, Float> usageRatios;
+ private ReadLock readLock;
+ private WriteLock writeLock;
+
+ public UsageRatios() {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ readLock = lock.readLock();
+ writeLock = lock.writeLock();
+ usageRatios = new HashMap<String, Float>();
+ }
+
+ private void incUsageRatio(String label, float delta) {
+ try {
+ writeLock.lock();
+ float usage = 0f;
+ if (usageRatios.containsKey(label)) {
+ usage = usageRatios.get(label);
+ }
+ usage += delta;
+ usageRatios.put(label, usage);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private float getUsageRatio(String label) {
+ try {
+ readLock.lock();
+ Float f = usageRatios.get(label);
+ if (null == f) {
+ return 0.0f;
+ }
+ return f;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private void setUsageRatio(String label, float ratio) {
+ try {
+ writeLock.lock();
+ usageRatios.put(label, ratio);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+ } /* End of UserRatios class */
+
+ /**
+ * User class stores all user related resource usage, application details.
+ */
+ @VisibleForTesting
+ public static class User {
+ ResourceUsage userResourceUsage = new ResourceUsage();
+ String userName = null;
+ volatile Resource userResourceLimit = Resource.newInstance(0, 0);
+ private volatile AtomicInteger pendingApplications = new AtomicInteger(0);
+ private volatile AtomicInteger activeApplications = new AtomicInteger(0);
+
+ private UsageRatios userUsageRatios = new UsageRatios();
+ private WriteLock writeLock;
+
+ public User(String name) {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ // Nobody uses read-lock now, will add it when necessary
+ writeLock = lock.writeLock();
+
+ this.userName = name;
+ }
+
+ public ResourceUsage getResourceUsage() {
+ return userResourceUsage;
+ }
+
+ public float setAndUpdateUsageRatio(ResourceCalculator resourceCalculator,
+ Resource resource, String nodePartition) {
+ try {
+ writeLock.lock();
+ userUsageRatios.setUsageRatio(nodePartition, 0);
+ return updateUsageRatio(resourceCalculator, resource, nodePartition);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public float updateUsageRatio(ResourceCalculator resourceCalculator,
+ Resource resource, String nodePartition) {
+ try {
+ writeLock.lock();
+ float delta;
+ float newRatio = Resources.ratio(resourceCalculator,
+ getUsed(nodePartition), resource);
+ delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
+ userUsageRatios.setUsageRatio(nodePartition, newRatio);
+ return delta;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ public Resource getUsed() {
+ return userResourceUsage.getUsed();
+ }
+
+ public Resource getAllUsed() {
+ return userResourceUsage.getAllUsed();
+ }
+
+ public Resource getUsed(String label) {
+ return userResourceUsage.getUsed(label);
+ }
+
+ public int getPendingApplications() {
+ return pendingApplications.get();
+ }
+
+ public int getActiveApplications() {
+ return activeApplications.get();
+ }
+
+ public Resource getConsumedAMResources() {
+ return userResourceUsage.getAMUsed();
+ }
+
+ public Resource getConsumedAMResources(String label) {
+ return userResourceUsage.getAMUsed(label);
+ }
+
+ public int getTotalApplications() {
+ return getPendingApplications() + getActiveApplications();
+ }
+
+ public void submitApplication() {
+ pendingApplications.incrementAndGet();
+ }
+
+ public void activateApplication() {
+ pendingApplications.decrementAndGet();
+ activeApplications.incrementAndGet();
+ }
+
+ public void finishApplication(boolean wasActive) {
+ if (wasActive) {
+ activeApplications.decrementAndGet();
+ } else {
+ pendingApplications.decrementAndGet();
+ }
+ }
+
+ public Resource getUserResourceLimit() {
+ return userResourceLimit;
+ }
+
+ public void setUserResourceLimit(Resource userResourceLimit) {
+ this.userResourceLimit = userResourceLimit;
+ }
+ } /* End of User class */
+
+ /**
+ * UsersManager Constructor.
+ *
+ * @param metrics
+ * Queue Metrics
+ * @param lQueue
+ * Leaf Queue Object
+ * @param labelManager
+ * Label Manager instance
+ * @param scheduler
+ * Capacity Scheduler Context
+ * @param resourceCalculator
+ * rc
+ */
+ public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
+ RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
+ ResourceCalculator resourceCalculator) {
+ ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+ this.lQueue = lQueue;
+ this.scheduler = scheduler;
+ this.labelManager = labelManager;
+ this.resourceCalculator = resourceCalculator;
+ this.qUsageRatios = new UsageRatios();
+ this.metrics = metrics;
+
+ this.writeLock = lock.writeLock();
+ this.readLock = lock.readLock();
+ }
+
+ /**
+ * Get configured user-limit.
+ * @return user limit
+ */
+ public int getUserLimit() {
+ return userLimit;
+ }
+
+ /**
+ * Set configured user-limit.
+ * @param userLimit user limit
+ */
+ public void setUserLimit(int userLimit) {
+ this.userLimit = userLimit;
+ }
+
+ /**
+ * Get configured user-limit factor.
+ * @return user-limit factor
+ */
+ public float getUserLimitFactor() {
+ return userLimitFactor;
+ }
+
+ /**
+ * Set configured user-limit factor.
+ * @param userLimitFactor User Limit factor.
+ */
+ public void setUserLimitFactor(float userLimitFactor) {
+ this.userLimitFactor = userLimitFactor;
+ }
+
+ @VisibleForTesting
+ public float getUsageRatio(String label) {
+ return qUsageRatios.getUsageRatio(label);
+ }
+
+ /**
+ * Force UsersManager to recompute userlimit.
+ */
+ public void userLimitNeedsRecompute() {
+
+ // If latestVersionOfUsersState is negative due to overflow, ideally we need
+ // to reset it. This method is invoked from UsersManager and LeafQueue and
+ // all is happening within write/readLock. Below logic can help to set 0.
+ try {
+ writeLock.lock();
+
+ long value = latestVersionOfUsersState.incrementAndGet();
+ if (value < 0) {
+ latestVersionOfUsersState.set(0);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /*
+ * Get all users of queue.
+ */
+ private Map<String, User> getUsers() {
+ return users;
+ }
+
+ /**
+ * Get user object for given user name.
+ *
+ * @param userName
+ * User Name
+ * @return User object
+ */
+ public User getUser(String userName) {
+ return users.get(userName);
+ }
+
+ /**
+ * Remove user.
+ *
+ * @param userName
+ * User Name
+ */
+ public void removeUser(String userName) {
+ try {
+ writeLock.lock();
+ this.users.remove(userName);
+
+ // Remove user from active/non-active list as well.
+ activeUsersSet.remove(userName);
+ nonActiveUsersSet.remove(userName);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /**
+ * Get and add user if absent.
+ *
+ * @param userName
+ * User Name
+ * @return User object
+ */
+ public User getUserAndAddIfAbsent(String userName) {
+ try {
+ writeLock.lock();
+ User u = getUser(userName);
+ if (null == u) {
+ u = new User(userName);
+ addUser(userName, u);
+
+ // Add to nonActive list so that resourceUsage could be tracked
+ if (!nonActiveUsersSet.contains(userName)) {
+ nonActiveUsersSet.add(userName);
+ }
+ }
+ return u;
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /*
+ * Add a new user
+ */
+ private void addUser(String userName, User user) {
+ this.users.put(userName, user);
+ }
+
+ /**
+ * @return an ArrayList of UserInfo objects who are active in this queue
+ */
+ public ArrayList<UserInfo> getUsersInfo() {
+ try {
+ readLock.lock();
+ ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
+ for (Map.Entry<String, User> entry : getUsers().entrySet()) {
+ User user = entry.getValue();
+ usersToReturn.add(
+ new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
+ user.getActiveApplications(), user.getPendingApplications(),
+ Resources.clone(user.getConsumedAMResources()),
+ Resources.clone(user.getUserResourceLimit()),
+ user.getResourceUsage()));
+ }
+ return usersToReturn;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ /**
+ * Get computed user-limit for all ACTIVE users in this queue. If cached data
+ * is invalidated due to resource change, this method also enforce to
+ * recompute user-limit.
+ *
+ * @param userName
+ * Name of user who has submitted one/more app to given queue.
+ * @param clusterResource
+ * total cluster resource
+ * @param nodePartition
+ * partition name
+ * @param schedulingMode
+ * scheduling mode
+ * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+ * @return Computed User Limit
+ */
+ public Resource getComputedResourceLimitForActiveUsers(String userName,
+ Resource clusterResource, String nodePartition,
+ SchedulingMode schedulingMode) {
+
+ Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedActiveUserLimit
+ .get(nodePartition);
+
+ try {
+ writeLock.lock();
+ if (isRecomputeNeeded(schedulingMode, nodePartition, true)) {
+ // recompute
+ userLimitPerSchedulingMode = reComputeUserLimits(userName,
+ nodePartition, clusterResource, schedulingMode, true);
+
+ // update user count to cache so that we can avoid recompute if no major
+ // changes.
+ setLocalVersionOfUsersState(nodePartition, schedulingMode, true);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("userLimit is fetched. userLimit = "
+ + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ + schedulingMode + ", partition=" + nodePartition);
+ }
+
+ return userLimitPerSchedulingMode.get(schedulingMode);
+ }
+
+ /**
+ * Get computed user-limit for all users in this queue. If cached data is
+ * invalidated due to resource change, this method also enforce to recompute
+ * user-limit.
+ *
+ * @param userName
+ * Name of user who has submitted one/more app to given queue.
+ * @param clusterResource
+ * total cluster resource
+ * @param nodePartition
+ * partition name
+ * @param schedulingMode
+ * scheduling mode
+ * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+ * @return Computed User Limit
+ */
+ public Resource getComputedResourceLimitForAllUsers(String userName,
+ Resource clusterResource, String nodePartition,
+ SchedulingMode schedulingMode) {
+
+ Map<SchedulingMode, Resource> userLimitPerSchedulingMode = preComputedAllUserLimit
+ .get(nodePartition);
+
+ try {
+ writeLock.lock();
+ if (isRecomputeNeeded(schedulingMode, nodePartition, false)) {
+ // recompute
+ userLimitPerSchedulingMode = reComputeUserLimits(userName,
+ nodePartition, clusterResource, schedulingMode, false);
+
+ // update user count to cache so that we can avoid recompute if no major
+ // changes.
+ setLocalVersionOfUsersState(nodePartition, schedulingMode, false);
+ }
+ } finally {
+ writeLock.unlock();
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("userLimit is fetched. userLimit = "
+ + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
+ + schedulingMode + ", partition=" + nodePartition);
+ }
+
+ return userLimitPerSchedulingMode.get(schedulingMode);
+ }
+
+ /*
+ * Recompute user-limit under following conditions: 1. cached user-limit does
+ * not exist in local map. 2. Total User count doesn't match with local cached
+ * version.
+ */
+ private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
+ String nodePartition, boolean isActive) {
+ return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
+ isActive) != latestVersionOfUsersState.get());
+ }
+
+ /*
+ * Set Local version of user count per label to invalidate cache if needed.
+ */
+ private void setLocalVersionOfUsersState(String nodePartition,
+ SchedulingMode schedulingMode, boolean isActive) {
+ try {
+ writeLock.lock();
+ Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
+ ? localVersionOfActiveUsersState
+ : localVersionOfAllUsersState;
+
+ Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
+ .get(nodePartition);
+ if (null == localVersion) {
+ localVersion = new HashMap<SchedulingMode, Long>();
+ localVersionOfUsersState.put(nodePartition, localVersion);
+ }
+
+ localVersion.put(schedulingMode, latestVersionOfUsersState.get());
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /*
+ * Get Local version of user count per label to invalidate cache if needed.
+ */
+ private long getLocalVersionOfUsersState(String nodePartition,
+ SchedulingMode schedulingMode, boolean isActive) {
+ try {
+ this.readLock.lock();
+ Map<String, Map<SchedulingMode, Long>> localVersionOfUsersState = (isActive)
+ ? localVersionOfActiveUsersState
+ : localVersionOfAllUsersState;
+
+ if (!localVersionOfUsersState.containsKey(nodePartition)) {
+ return -1;
+ }
+
+ Map<SchedulingMode, Long> localVersion = localVersionOfUsersState
+ .get(nodePartition);
+ if (!localVersion.containsKey(schedulingMode)) {
+ return -1;
+ }
+
+ return localVersion.get(schedulingMode);
+ } finally {
+ readLock.unlock();
+ }
+ }
+
+ private Map<SchedulingMode, Resource> reComputeUserLimits(String userName,
+ String nodePartition, Resource clusterResource,
+ SchedulingMode schedulingMode, boolean activeMode) {
+
+ // preselect stored map as per active user-limit or all user computation.
+ Map<String, Map<SchedulingMode, Resource>> computedMap = null;
+ computedMap = (activeMode)
+ ? preComputedActiveUserLimit
+ : preComputedAllUserLimit;
+
+ Map<SchedulingMode, Resource> userLimitPerSchedulingMode = computedMap
+ .get(nodePartition);
+
+ if (userLimitPerSchedulingMode == null) {
+ userLimitPerSchedulingMode = new ConcurrentHashMap<>();
+ computedMap.put(nodePartition, userLimitPerSchedulingMode);
+ }
+
+ // compute user-limit per scheduling mode.
+ Resource computedUserLimit = computeUserLimit(userName, clusterResource,
+ nodePartition, schedulingMode, activeMode);
+
+ // update in local storage
+ userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit);
+
+ return userLimitPerSchedulingMode;
+ }
+
+ private Resource computeUserLimit(String userName, Resource clusterResource,
+ String nodePartition, SchedulingMode schedulingMode, boolean activeUser) {
+ Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
+ clusterResource);
+
+ /*
+ * What is our current capacity?
+ * * It is equal to the max(required, queue-capacity) if we're running
+ * below capacity. The 'max' ensures that jobs in queues with miniscule
+ * capacity (< 1 slot) make progress
+ * * If we're running over capacity, then its (usedResources + required)
+ * (which extra resources we are allocating)
+ */
+ Resource queueCapacity = Resources.multiplyAndNormalizeUp(
+ resourceCalculator, partitionResource,
+ lQueue.getQueueCapacities().getAbsoluteCapacity(nodePartition),
+ lQueue.getMinimumAllocation());
+
+ /*
+ * Assume we have required resource equals to minimumAllocation, this can
+ * make sure user limit can continuously increase till queueMaxResource
+ * reached.
+ */
+ Resource required = lQueue.getMinimumAllocation();
+
+ // Allow progress for queues with miniscule capacity
+ queueCapacity = Resources.max(resourceCalculator, partitionResource,
+ queueCapacity, required);
+
+ /*
+ * We want to base the userLimit calculation on max(queueCapacity,
+ * usedResources+required). However, we want usedResources to be based on
+ * the combined ratios of all the users in the queue so we use consumedRatio
+ * to calculate such. The calculation is dependent on how the
+ * resourceCalculator calculates the ratio between two Resources. DRF
+ * Example: If usedResources is greater than queueCapacity and users have
+ * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
+ * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
+ * is then 20+30=50%. Yes, this value can be larger than 100% but for the
+ * purposes of making sure all users are getting their fair share, it works.
+ */
+ Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
+ partitionResource, getUsageRatio(nodePartition),
+ lQueue.getMinimumAllocation());
+ Resource currentCapacity = Resources.lessThan(resourceCalculator,
+ partitionResource, consumed, queueCapacity)
+ ? queueCapacity
+ : Resources.add(consumed, required);
+
+ /*
+ * Never allow a single user to take more than the queue's configured
+ * capacity * user-limit-factor. Also, the queue's configured capacity
+ * should be higher than queue-hard-limit * ulMin
+ */
+ int usersCount = getNumActiveUsers();
+ Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
+
+ // For non-activeUser calculation, consider all users count.
+ if (!activeUser) {
+ resourceUsed = currentCapacity;
+ usersCount = users.size();
+ }
+
+ /*
+ * User limit resource is determined by: max{currentCapacity / #activeUsers,
+ * currentCapacity * user-limit-percentage%)
+ */
+ Resource userLimitResource = Resources.max(resourceCalculator,
+ partitionResource,
+ Resources.divideAndCeil(resourceCalculator, resourceUsed,
+ usersCount),
+ Resources.divideAndCeil(resourceCalculator,
+ Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
+ 100));
+
+ // User limit is capped by maxUserLimit
+ // - maxUserLimit = queueCapacity * user-limit-factor
+ // (RESPECT_PARTITION_EXCLUSIVITY)
+ // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
+ //
+ // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
+ // partition, its guaranteed resource on that partition is 0. And
+ // user-limit-factor computation is based on queue's guaranteed capacity. So
+ // we will not cap user-limit as well as used resource when doing
+ // IGNORE_PARTITION_EXCLUSIVITY allocation.
+ Resource maxUserLimit = Resources.none();
+ if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
+ maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
+ getUserLimitFactor());
+ } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
+ maxUserLimit = partitionResource;
+ }
+
+ // Cap final user limit with maxUserLimit
+ userLimitResource = Resources
+ .roundUp(resourceCalculator,
+ Resources.min(resourceCalculator, partitionResource,
+ userLimitResource, maxUserLimit),
+ lQueue.getMinimumAllocation());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User limit computation for " + userName + " in queue "
+ + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
+ + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
+ + required + " consumed: " + consumed + " user-limit-resource: "
+ + userLimitResource + " queueCapacity: " + queueCapacity
+ + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
+ + " currentCapacity: " + currentCapacity + " activeUsers: "
+ + usersCount + " clusterCapacity: " + clusterResource
+ + " resourceByLabel: " + partitionResource + " usageratio: "
+ + getUsageRatio(nodePartition) + " Partition: " + nodePartition);
+ }
+ getUser(userName).setUserResourceLimit(userLimitResource);
+ return userLimitResource;
+ }
+
+ /**
+ * Update new usage ratio.
+ *
+ * @param partition
+ * Node partition
+ * @param clusterResource
+ * Cluster Resource
+ */
+ public void updateUsageRatio(String partition, Resource clusterResource) {
+ try {
+ writeLock.lock();
+ Resource resourceByLabel = labelManager.getResourceByLabel(partition,
+ clusterResource);
+ float consumed = 0;
+ User user;
+ for (Map.Entry<String, User> entry : getUsers().entrySet()) {
+ user = entry.getValue();
+ consumed += user.setAndUpdateUsageRatio(resourceCalculator,
+ resourceByLabel, partition);
+ }
+
+ qUsageRatios.setUsageRatio(partition, consumed);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ /*
+ * Increment Queue Usage Ratio.
+ */
+ private void incQueueUsageRatio(String nodePartition, float delta) {
+ qUsageRatios.incUsageRatio(nodePartition, delta);
+ }
+
+ @Override
+ public void activateApplication(String user, ApplicationId applicationId) {
+ try {
+ this.writeLock.lock();
+
+ Set<ApplicationId> userApps = usersApplications.get(user);
+ if (userApps == null) {
+ userApps = new HashSet<ApplicationId>();
+ usersApplications.put(user, userApps);
+ activeUsers.incrementAndGet();
+ metrics.incrActiveUsers();
+
+ // A user is added to active list. Invalidate user-limit cache.
+ userLimitNeedsRecompute();
+ updateActiveUsersResourceUsage(user);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User " + user + " added to activeUsers, currently: "
+ + activeUsers);
+ }
+ }
+ if (userApps.add(applicationId)) {
+ metrics.activateApp(user);
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
+ public void deactivateApplication(String user, ApplicationId applicationId) {
+ try {
+ this.writeLock.lock();
+
+ Set<ApplicationId> userApps = usersApplications.get(user);
+ if (userApps != null) {
+ if (userApps.remove(applicationId)) {
+ metrics.deactivateApp(user);
+ }
+ if (userApps.isEmpty()) {
+ usersApplications.remove(user);
+ activeUsers.decrementAndGet();
+ metrics.decrActiveUsers();
+
+ // A user is removed from active list. Invalidate user-limit cache.
+ userLimitNeedsRecompute();
+ updateNonActiveUsersResourceUsage(user);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User " + user + " removed from activeUsers, currently: "
+ + activeUsers);
+ }
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ @Override
+ public int getNumActiveUsers() {
+ return activeUsers.get();
+ }
+
+ private void updateActiveUsersResourceUsage(String userName) {
+ try {
+ this.writeLock.lock();
+
+ // For UT case: We might need to add the user to users list.
+ User user = getUserAndAddIfAbsent(userName);
+ ResourceUsage resourceUsage = user.getResourceUsage();
+ // If User is moved to active list, moved resource usage from non-active
+ // to active list.
+ if (nonActiveUsersSet.contains(userName)) {
+ nonActiveUsersSet.remove(userName);
+ activeUsersSet.add(userName);
+
+ // Update total resource usage of active and non-active after user
+ // is moved from non-active to active.
+ for (String partition : resourceUsage.getNodePartitionsSet()) {
+ totalResUsageForNonActiveUsers.decUsed(partition,
+ resourceUsage.getUsed(partition));
+ totalResUsageForActiveUsers.incUsed(partition,
+ resourceUsage.getUsed(partition));
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User '" + userName
+ + "' has become active. Hence move user to active list."
+ + "Active users size = " + activeUsersSet.size()
+ + "Non-active users size = " + nonActiveUsersSet.size()
+ + "Total Resource usage for active users="
+ + totalResUsageForActiveUsers.getAllUsed() + "."
+ + "Total Resource usage for non-active users="
+ + totalResUsageForNonActiveUsers.getAllUsed());
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void updateNonActiveUsersResourceUsage(String userName) {
+ try {
+ this.writeLock.lock();
+
+ // For UT case: We might need to add the user to users list.
+ User user = getUserAndAddIfAbsent(userName);
+ ResourceUsage resourceUsage = user.getResourceUsage();
+ // If User is moved to non-active list, moved resource usage from
+ // non-active to active list.
+ if (activeUsersSet.contains(userName)) {
+ activeUsersSet.remove(userName);
+ nonActiveUsersSet.add(userName);
+
+ // Update total resource usage of active and non-active after user is
+ // moved from active to non-active.
+ for (String partition : resourceUsage.getNodePartitionsSet()) {
+ totalResUsageForActiveUsers.decUsed(partition,
+ resourceUsage.getUsed(partition));
+ totalResUsageForNonActiveUsers.incUsed(partition,
+ resourceUsage.getUsed(partition));
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User '" + userName
+ + "' has become non-active.Hence move user to non-active list."
+ + "Active users size = " + activeUsersSet.size()
+ + "Non-active users size = " + nonActiveUsersSet.size()
+ + "Total Resource usage for active users="
+ + totalResUsageForActiveUsers.getAllUsed() + "."
+ + "Total Resource usage for non-active users="
+ + totalResUsageForNonActiveUsers.getAllUsed());
+ }
+ }
+ }
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private ResourceUsage getTotalResourceUsagePerUser(String userName) {
+ if (nonActiveUsersSet.contains(userName)) {
+ return totalResUsageForNonActiveUsers;
+ } else if (activeUsersSet.contains(userName)) {
+ return totalResUsageForActiveUsers;
+ } else {
+ LOG.warn("User '" + userName
+ + "' is not present in active/non-active. This is highly unlikely."
+ + "We can consider this user in non-active list in this case.");
+ return totalResUsageForNonActiveUsers;
+ }
+ }
+
+ /**
+ * During container allocate/release, ensure that all user specific data
+ * structures are updated.
+ *
+ * @param userName
+ * Name of the user
+ * @param resource
+ * Resource to increment/decrement
+ * @param nodePartition
+ * Node label
+ * @param isAllocate
+ * Indicate whether to allocate or release resource
+ * @return user
+ */
+ public User updateUserResourceUsage(String userName, Resource resource,
+ String nodePartition, boolean isAllocate) {
+ try {
+ this.writeLock.lock();
+
+ // TODO, should use getUser, use this method just to avoid UT failure
+ // which is caused by wrong invoking order, will fix UT separately
+ User user = getUserAndAddIfAbsent(userName);
+
+ // New container is allocated. Invalidate user-limit.
+ updateResourceUsagePerUser(user, resource, nodePartition, isAllocate);
+
+ userLimitNeedsRecompute();
+
+ // Update usage ratios
+ Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
+ scheduler.getClusterResource());
+ incQueueUsageRatio(nodePartition, user.updateUsageRatio(
+ resourceCalculator, resourceByLabel, nodePartition));
+
+ return user;
+ } finally {
+ this.writeLock.unlock();
+ }
+ }
+
+ private void updateResourceUsagePerUser(User user, Resource resource,
+ String nodePartition, boolean isAllocate) {
+ ResourceUsage totalResourceUsageForUsers = getTotalResourceUsagePerUser(
+ user.userName);
+
+ if (isAllocate) {
+ user.getResourceUsage().incUsed(nodePartition, resource);
+ totalResourceUsageForUsers.incUsed(nodePartition, resource);
+ } else {
+ user.getResourceUsage().decUsed(nodePartition, resource);
+ totalResourceUsageForUsers.decUsed(nodePartition, resource);
+ }
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "User resource is updated." + "Total Resource usage for active users="
+ + totalResUsageForActiveUsers.getAllUsed() + "."
+ + "Total Resource usage for non-active users="
+ + totalResUsageForNonActiveUsers.getAllUsed());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 3635398..ee5de60 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -54,7 +54,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFini
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
@@ -115,24 +115,24 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
new ConcurrentHashMap<>();
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
- this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+ this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
Priority.newInstance(0), false);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering) {
- this(applicationAttemptId, user, queue, activeUsersManager, rmContext,
+ this(applicationAttemptId, user, queue, abstractUsersManager, rmContext,
appPriority, isAttemptRecovering, null);
}
public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext, Priority appPriority, boolean isAttemptRecovering,
ActivitiesManager activitiesManager) {
- super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
+ super(applicationAttemptId, user, queue, abstractUsersManager, rmContext);
RMApp rmApp = rmContext.getRMApps().get(getApplicationId());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
index 0fad8be..855b8f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
@@ -488,7 +488,7 @@ public class FSLeafQueue extends FSQueue {
}
@Override
- public ActiveUsersManager getActiveUsersManager() {
+ public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
index 3bc81ac..5b4e4dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
@@ -281,7 +281,7 @@ public class FSParentQueue extends FSQueue {
}
@Override
- public ActiveUsersManager getActiveUsersManager() {
+ public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
index b90063d..61b527a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
@@ -177,7 +177,7 @@ public class FifoScheduler extends
}
@Override
- public ActiveUsersManager getActiveUsersManager() {
+ public ActiveUsersManager getAbstractUsersManager() {
return activeUsersManager;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index e0ac56f..7dcdf58 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
maxApplications = q.getMaxApplications();
maxApplicationsPerUser = q.getMaxApplicationsPerUser();
userLimit = q.getUserLimit();
- users = new UsersInfo(q.getUsers());
+ users = new UsersInfo(q.getUsersManager().getUsersInfo());
userLimitFactor = q.getUserLimitFactor();
AMResourceLimit = new ResourceInfo(q.getAMResourceLimit());
usedAMResource = new ResourceInfo(q.getQueueResourceUsage().getAMUsed());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
index 32b2c68..dfab3b2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicyMockFramework.java
@@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -362,9 +363,10 @@ public class ProportionalCapacityPreemptionPolicyMockFramework {
queue.getQueueCapacities().getAbsoluteCapacity());
HashSet<String> users = userMap.get(queue.getQueueName());
Resource userLimit = Resources.divideAndCeil(rc, capacity, users.size());
- for (String user : users) {
- when(queue.getUserLimitPerUser(eq(user), any(Resource.class),
- anyString())).thenReturn(userLimit);
+ for (String userName : users) {
+ when(queue.getResourceLimitForAllUsers(eq(userName),
+ any(Resource.class), anyString(), any(SchedulingMode.class)))
+ .thenReturn(userLimit);
}
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
index ca73b6a..fa16eff 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java
@@ -73,7 +73,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
- user, oldQueue, oldQueue.getActiveUsersManager(), rmContext);
+ user, oldQueue, oldQueue.getAbstractUsersManager(), rmContext);
oldMetrics.submitApp(user);
// confirm that containerId is calculated based on epoch.
@@ -169,7 +169,7 @@ public class TestSchedulerApplicationAttempt {
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
Queue queue = mock(Queue.class);
when(queue.getMetrics()).thenReturn(metrics);
- when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+ when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
return queue;
}
@@ -198,7 +198,7 @@ public class TestSchedulerApplicationAttempt {
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app =
new SchedulerApplicationAttempt(appAttId, user, queue,
- queue.getActiveUsersManager(), rmContext);
+ queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@@ -211,7 +211,7 @@ public class TestSchedulerApplicationAttempt {
queue = createQueue("test2", null, 0.5f);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
- queue.getActiveUsersManager(), rmContext);
+ queue.getAbstractUsersManager(), rmContext);
app.attemptResourceUsage.incUsed(requestedResource);
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
0.01f);
@@ -229,7 +229,7 @@ public class TestSchedulerApplicationAttempt {
queue = createQueue("test3", null, 0.0f);
app = new SchedulerApplicationAttempt(appAttId, user, queue,
- queue.getActiveUsersManager(), rmContext);
+ queue.getAbstractUsersManager(), rmContext);
// Resource request
app.attemptResourceUsage.incUsed(requestedResource);
@@ -255,7 +255,7 @@ public class TestSchedulerApplicationAttempt {
final String user = "user1";
Queue queue = createQueue("test", null);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(appAttId,
- user, queue, queue.getActiveUsersManager(), rmContext);
+ user, queue, queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@@ -274,7 +274,7 @@ public class TestSchedulerApplicationAttempt {
RMContext rmContext = mock(RMContext.class);
when(rmContext.getEpoch()).thenReturn(3L);
SchedulerApplicationAttempt app = new SchedulerApplicationAttempt(
- attemptId, "user", queue, queue.getActiveUsersManager(), rmContext);
+ attemptId, "user", queue, queue.getAbstractUsersManager(), rmContext);
Priority priority = Priority.newInstance(1);
SchedulerRequestKey schedulerKey = toSchedulerKey(priority);
assertEquals(0, app.getSchedulingOpportunities(schedulerKey));
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 37b0da8..e9b1f9d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -193,7 +193,7 @@ public class TestApplicationLimits {
clusterResource));
ActiveUsersManager activeUsersManager = mock(ActiveUsersManager.class);
- when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+ when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
assertEquals(Resource.newInstance(8 * GB, 1),
queue.calculateAndGetAMResourceLimit());
@@ -634,7 +634,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(
appAttemptId_0_0, user_0, queue,
- queue.getActiveUsersManager(), spyRMContext);
+ queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -646,7 +646,7 @@ public class TestApplicationLimits {
// Schedule to compute
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- Resource expectedHeadroom = Resources.createResource(10*16*GB, 1);
+ Resource expectedHeadroom = Resources.createResource(5*16*GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
@@ -654,7 +654,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(
appAttemptId_0_1, user_0, queue,
- queue.getActiveUsersManager(), spyRMContext);
+ queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -674,7 +674,7 @@ public class TestApplicationLimits {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(
appAttemptId_1_0, user_1, queue,
- queue.getActiveUsersManager(), spyRMContext);
+ queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@@ -693,6 +693,11 @@ public class TestApplicationLimits {
// Now reduce cluster size and check for the smaller headroom
clusterResource = Resources.createResource(90*16*GB);
+
+ // Any change is cluster resource needs to enforce user-limit recomputation.
+ // In existing code, LeafQueue#updateClusterResource handled this. However
+ // here that method was not used.
+ queue.getUsersManager().userLimitNeedsRecompute();
queue.assignContainers(clusterResource, node_0, new ResourceLimits(
clusterResource), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
expectedHeadroom = Resources.createResource(9*16*GB / 2, 1); // changes
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index 547571e..0aac2ef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -659,7 +659,7 @@ public class TestApplicationLimitsByPartition {
final ApplicationAttemptId appAttemptId_0_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0_0 = new FiCaSchedulerApp(appAttemptId_0_0, user_0,
- queue, queue.getActiveUsersManager(), spyRMContext);
+ queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_0, user_0);
List<ResourceRequest> app_0_0_requests = new ArrayList<ResourceRequest>();
@@ -671,16 +671,16 @@ public class TestApplicationLimitsByPartition {
queue.assignContainers(clusterResource, node_0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- //head room = queue capacity = 50 % 90% 160 GB
+ //head room = queue capacity = 50 % 90% 160 GB * 0.25 (UL)
Resource expectedHeadroom =
- Resources.createResource((int) (0.5 * 0.9 * 160) * GB, 1);
+ Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
// Submit second application from user_0, check headroom
final ApplicationAttemptId appAttemptId_0_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_0_1 = new FiCaSchedulerApp(appAttemptId_0_1, user_0,
- queue, queue.getActiveUsersManager(), spyRMContext);
+ queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_0_1, user_0);
List<ResourceRequest> app_0_1_requests = new ArrayList<ResourceRequest>();
@@ -703,15 +703,16 @@ public class TestApplicationLimitsByPartition {
assertEquals(expectedHeadroom, app_0_0.getHeadroom());// no change
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
- Resource expectedHeadroomWithReqInY =
- Resources.add(Resources.createResource((int) (0.5 * 160) * GB, 1), expectedHeadroom);
+ Resource expectedHeadroomWithReqInY = Resources.add(
+ Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
+ expectedHeadroom);
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
// Submit first application from user_1, check for new headroom
final ApplicationAttemptId appAttemptId_1_0 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_1_0 = new FiCaSchedulerApp(appAttemptId_1_0, user_1,
- queue, queue.getActiveUsersManager(), spyRMContext);
+ queue, queue.getAbstractUsersManager(), spyRMContext);
queue.submitApplicationAttempt(app_1_0, user_1);
List<ResourceRequest> app_1_0_requests = new ArrayList<ResourceRequest>();
@@ -730,12 +731,12 @@ public class TestApplicationLimitsByPartition {
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); // Schedule to compute
//head room = queue capacity = (50 % 90% 160 GB)/2 (for 2 users)
expectedHeadroom =
- Resources.createResource((int) (0.5 * 0.9 * 160 * 0.5) * GB, 1);
+ Resources.createResource((int) (0.5 * 0.9 * 160 * 0.25) * GB, 1);
//head room for default label + head room for y partition
//head room for y partition = 100% 50%(b queue capacity ) * 160 * GB
- expectedHeadroomWithReqInY =
- Resources.add(Resources.createResource((int) (0.5 * 0.5 * 160) * GB, 1),
- expectedHeadroom);
+ expectedHeadroomWithReqInY = Resources.add(
+ Resources.createResource((int) (0.25 * 0.5 * 160) * GB, 1),
+ expectedHeadroom);
assertEquals(expectedHeadroom, app_0_0.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_0_1.getHeadroom());
assertEquals(expectedHeadroomWithReqInY, app_1_0.getHeadroom());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
index 8c850de..b4ebd15 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNodeLabelUpdate.java
@@ -186,7 +186,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
String userName, String partition, int memory) {
CapacityScheduler scheduler = (CapacityScheduler) rm.getResourceScheduler();
LeafQueue queue = (LeafQueue) scheduler.getQueue(queueName);
- LeafQueue.User user = queue.getUser(userName);
+ UsersManager.User user = queue.getUser(userName);
Assert.assertEquals(memory,
user.getResourceUsage().getUsed(partition).getMemorySize());
}
@@ -243,7 +243,7 @@ public class TestCapacitySchedulerNodeLabelUpdate {
LeafQueue queue =
(LeafQueue) ((CapacityScheduler) rm.getResourceScheduler())
.getQueue("a");
- ArrayList<UserInfo> users = queue.getUsers();
+ ArrayList<UserInfo> users = queue.getUsersManager().getUsersInfo();
for (UserInfo userInfo : users) {
if (userInfo.getUsername().equals("user")) {
ResourceInfo resourcesUsed = userInfo.getResourcesUsed();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index f572ea3..7ad4f7d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -69,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
@@ -77,10 +78,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueStateManager
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
-
-
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -523,19 +522,22 @@ public class TestLeafQueue {
// Users
final String user_0 = "user_0";
+ // Active Users Manager
+ AbstractUsersManager activeUserManager = a.getAbstractUsersManager();
+
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext);
+ activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- mock(ActiveUsersManager.class), spyRMContext);
+ activeUserManager, spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
@@ -684,7 +686,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
- b.getActiveUsersManager(), spyRMContext);
+ b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
// Setup some nodes
@@ -748,14 +750,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, b,
- b.getActiveUsersManager(), spyRMContext);
+ b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app2 =
new FiCaSchedulerApp(appAttemptId2, user1, b,
- b.getActiveUsersManager(), spyRMContext);
+ b.getAbstractUsersManager(), spyRMContext);
b.submitApplicationAttempt(app2, user1);
// Setup some nodes
@@ -776,6 +778,7 @@ public class TestLeafQueue {
Resource clusterResource =
Resources.createResource(numNodes * (8 * GB), numNodes * 100);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
// Setup resource-requests so that one application is memory dominant
// and other application is vcores dominant
@@ -799,7 +802,7 @@ public class TestLeafQueue {
User queueUser1 = b.getUser(user1);
assertEquals("There should 2 active users!", 2, b
- .getActiveUsersManager().getNumActiveUsers());
+ .getAbstractUsersManager().getNumActiveUsers());
// Fill both Nodes as far as we can
CSAssignment assign;
do {
@@ -834,7 +837,7 @@ public class TestLeafQueue {
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
- assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+ assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
// Add another node and make sure consumedRatio is adjusted
// accordingly.
numNodes = 3;
@@ -848,7 +851,7 @@ public class TestLeafQueue {
/ (numNodes * 100.0f)
+ queueUser1.getUsed().getMemorySize()
/ (numNodes * 8.0f * GB);
- assertEquals(expectedRatio, b.getUsageRatio(""), 0.001);
+ assertEquals(expectedRatio, b.getUsersManager().getUsageRatio(""), 0.001);
}
@Test
@@ -858,6 +861,9 @@ public class TestLeafQueue {
//unset maxCapacity
a.setMaxCapacity(1.0f);
+ when(csContext.getClusterResource())
+ .thenReturn(Resources.createResource(16 * GB, 32));
+
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
@@ -867,14 +873,14 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_1, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_1); // different user
// Setup some nodes
@@ -913,7 +919,7 @@ public class TestLeafQueue {
a.setUserLimitFactor(2);
// There're two active users
- assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+ assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0
applyCSAssignment(clusterResource,
@@ -948,7 +954,7 @@ public class TestLeafQueue {
// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
- 1, a.getActiveUsersManager().getNumActiveUsers());
+ 1, a.getAbstractUsersManager().getNumActiveUsers());
}
@@ -999,7 +1005,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
- qb.getActiveUsersManager(), spyRMContext);
+ qb.getAbstractUsersManager(), spyRMContext);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = new HashMap<>();
apps.put(app_0.getApplicationAttemptId(), app_0);
qb.submitApplicationAttempt(app_0, user_0);
@@ -1010,7 +1016,7 @@ public class TestLeafQueue {
u0Priority, recordFactory)));
assertEquals("There should only be 1 active user!",
- 1, qb.getActiveUsersManager().getNumActiveUsers());
+ 1, qb.getAbstractUsersManager().getNumActiveUsers());
//get headroom
applyCSAssignment(clusterResource,
qb.assignContainers(clusterResource, node_0,
@@ -1027,7 +1033,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
- qb.getActiveUsersManager(), spyRMContext);
+ qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_2.getApplicationAttemptId(), app_2);
Priority u1Priority = TestUtils.createMockPriority(2);
SchedulerRequestKey u1SchedKey = toSchedulerKey(u1Priority);
@@ -1065,13 +1071,13 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
- qb.getActiveUsersManager(), spyRMContext);
+ qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_1.getApplicationAttemptId(), app_1);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
- qb.getActiveUsersManager(), spyRMContext);
+ qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_3.getApplicationAttemptId(), app_3);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
@@ -1100,7 +1106,7 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
- qb.getActiveUsersManager(), spyRMContext);
+ qb.getAbstractUsersManager(), spyRMContext);
apps.put(app_4.getApplicationAttemptId(), app_4);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
@@ -1123,9 +1129,9 @@ public class TestLeafQueue {
//testcase3 still active - 2+2+6=10
assertEquals(10*GB, qb.getUsedResources().getMemorySize());
//app4 is user 0
- //maxqueue 16G, userlimit 13G, used 8G, headroom 5G
+ //maxqueue 16G, userlimit 7G, used 8G, headroom 5G
//(8G used is 6G from this test case - app4, 2 from last test case, app_1)
- assertEquals(5*GB, app_4.getHeadroom().getMemorySize());
+ assertEquals(0*GB, app_4.getHeadroom().getMemorySize());
}
@Test
@@ -1144,21 +1150,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
@@ -1244,21 +1250,21 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
// Setup some nodes
@@ -1298,7 +1304,7 @@ public class TestLeafQueue {
// Now, only user_0 should be active since he is the only one with
// outstanding requests
assertEquals("There should only be 1 active user!",
- 1, a.getActiveUsersManager().getNumActiveUsers());
+ 1, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0
applyCSAssignment(clusterResource,
@@ -1309,8 +1315,8 @@ public class TestLeafQueue {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
// TODO, fix headroom in the future patch
- assertEquals(1*GB, app_0.getHeadroom().getMemorySize());
- // User limit = 4G, 2 in use
+ assertEquals(0*GB, app_0.getHeadroom().getMemorySize());
+ // User limit = 2G, 2 in use
assertEquals(0*GB, app_1.getHeadroom().getMemorySize());
// the application is not yet active
@@ -1322,15 +1328,15 @@ public class TestLeafQueue {
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(2*GB, app_0.getCurrentConsumption().getMemorySize());
assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
- assertEquals(1*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
- assertEquals(1*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
+ assertEquals(0*GB, app_0.getHeadroom().getMemorySize()); // 4G - 3G
+ assertEquals(0*GB, app_1.getHeadroom().getMemorySize()); // 4G - 3G
// Submit requests for app_1 and set max-cap
a.setMaxCapacity(.1f);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 1, true,
priority, recordFactory)));
- assertEquals(2, a.getActiveUsersManager().getNumActiveUsers());
+ assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// No more to user_0 since he is already over user-limit
// and no more containers to queue since it's already at max-cap
@@ -1349,7 +1355,7 @@ public class TestLeafQueue {
app_1.updateResourceRequests(Collections.singletonList( // unset
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 0, true,
priority, recordFactory)));
- assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
+ assertEquals(1, a.getAbstractUsersManager().getNumActiveUsers());
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node_1,
new ResourceLimits(clusterResource),
@@ -1375,28 +1381,28 @@ public class TestLeafQueue {
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_0, user_0);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_1, user_0); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_2, user_1);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_2, a,
- a.getActiveUsersManager(), spyRMContext);
+ a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app_3, user_2);
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
@@ -1414,7 +1420,8 @@ public class TestLeafQueue {
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
-
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
+
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
@@ -1741,6 +1748,7 @@ public class TestLeafQueue {
when(csContext.getNode(node_0.getNodeID())).thenReturn(node_0);
when(csContext.getNode(node_1.getNodeID())).thenReturn(node_1);
+ when(csContext.getClusterResource()).thenReturn(Resource.newInstance(8, 1));
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
@@ -3804,7 +3812,7 @@ public class TestLeafQueue {
final String user = "user1";
FiCaSchedulerApp app =
new FiCaSchedulerApp(appAttId, user, queue,
- queue.getActiveUsersManager(), rmContext);
+ queue.getAbstractUsersManager(), rmContext);
// Resource request
Resource requestedResource = Resource.newInstance(1536, 2);
@@ -3819,7 +3827,7 @@ public class TestLeafQueue {
// child of root, its absolute capaicty is also 50%.
queue = createQueue("test2", null, 0.5f, 0.5f);
app = new FiCaSchedulerApp(appAttId, user, queue,
- queue.getActiveUsersManager(), rmContext);
+ queue.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2" queue, 1536 used is 30% of "test2" and 15% of the cluster.
assertEquals(30.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@@ -3831,7 +3839,7 @@ public class TestLeafQueue {
// Therefore, "test2.1" capacity is 50% and absolute capacity is 25%.
AbstractCSQueue qChild = createQueue("test2.1", queue, 0.5f, 0.25f);
app = new FiCaSchedulerApp(appAttId, user, qChild,
- qChild.getActiveUsersManager(), rmContext);
+ qChild.getAbstractUsersManager(), rmContext);
app.getAppAttemptResourceUsage().incUsed(requestedResource);
// In "test2.1" queue, 1536 used is 60% of "test2.1" and 15% of the cluster.
assertEquals(60.0f, app.getResourceUsageReport().getQueueUsagePercentage(),
@@ -3855,7 +3863,7 @@ public class TestLeafQueue {
ActiveUsersManager activeUsersManager = new ActiveUsersManager(metrics);
AbstractCSQueue queue = mock(AbstractCSQueue.class);
when(queue.getMetrics()).thenReturn(metrics);
- when(queue.getActiveUsersManager()).thenReturn(activeUsersManager);
+ when(queue.getAbstractUsersManager()).thenReturn(activeUsersManager);
when(queue.getQueueInfo(false, false)).thenReturn(queueInfo);
QueueCapacities qCaps = mock(QueueCapacities.class);
when(qCaps.getAbsoluteCapacity((String) any())).thenReturn(absCap);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
index 32a9074..740ef33 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java
@@ -1049,7 +1049,7 @@ public class TestNodeLabelContainerAllocation {
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm2);
- // Each application request 5 * 1GB container
+ // Each application request 50 * 1GB container
am1.allocate("*", 1 * GB, 50, new ArrayList<ContainerId>());
// NM1 do 50 heartbeats
@@ -1169,12 +1169,14 @@ public class TestNodeLabelContainerAllocation {
csConf.setAccessibleNodeLabels(A, toSet("x"));
csConf.setCapacityByLabel(A, "x", 50);
csConf.setMaximumCapacityByLabel(A, "x", 50);
+ csConf.setUserLimit(A, 200);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
csConf.setCapacity(B, 50);
csConf.setAccessibleNodeLabels(B, toSet("x"));
csConf.setCapacityByLabel(B, "x", 50);
csConf.setMaximumCapacityByLabel(B, "x", 50);
+ csConf.setUserLimit(B, 200);
// set node -> label
mgr.addToCluserNodeLabels(ImmutableSet.of(
@@ -1207,6 +1209,7 @@ public class TestNodeLabelContainerAllocation {
SchedulerNode schedulerNode1 = cs.getSchedulerNode(nm1.getNodeId());
+ cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
for (int i = 0; i < 50; i++) {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
@@ -1250,7 +1253,7 @@ public class TestNodeLabelContainerAllocation {
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
}
}
-
+
private void waitSchedulerNodeJoined(MockRM rm, int expectedNodeNum)
throws InterruptedException {
int totalWaitTick = 100; // wait 10 sec at most.
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org
[2/2] hadoop git commit: YARN-5889. Improve and refactor user-limit
calculation in Capacity Scheduler. (Sunil G via wangda)
Posted by ep...@apache.org.
YARN-5889. Improve and refactor user-limit calculation in Capacity Scheduler. (Sunil G via wangda)
(cherry picked from commit 5fb723bb77722d41df6959eee23e1b0cfeb5584e)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/f2d440b3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/f2d440b3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/f2d440b3
Branch: refs/heads/branch-2
Commit: f2d440b3b3a094d1230c02cfa652f2f33df83b8d
Parents: e6cdf77
Author: Wangda Tan <wa...@apache.org>
Authored: Thu Feb 9 10:23:50 2017 -0800
Committer: Eric Payne <ep...@apache.org>
Committed: Wed Jul 12 10:06:02 2017 -0500
----------------------------------------------------------------------
.../FifoIntraQueuePreemptionPlugin.java | 6 +-
.../scheduler/AbstractUsersManager.java | 54 +
.../scheduler/ActiveUsersManager.java | 23 +-
.../scheduler/AppSchedulingInfo.java | 16 +-
.../server/resourcemanager/scheduler/Queue.java | 2 +-
.../scheduler/SchedulerApplicationAttempt.java | 4 +-
.../scheduler/capacity/CSQueue.java | 8 +-
.../capacity/CapacityHeadroomProvider.java | 4 +-
.../scheduler/capacity/CapacityScheduler.java | 2 +-
.../scheduler/capacity/LeafQueue.java | 587 +++--------
.../scheduler/capacity/ParentQueue.java | 2 +-
.../scheduler/capacity/UsersManager.java | 982 +++++++++++++++++++
.../scheduler/common/fica/FiCaSchedulerApp.java | 14 +-
.../scheduler/fair/FSLeafQueue.java | 2 +-
.../scheduler/fair/FSParentQueue.java | 2 +-
.../scheduler/fifo/FifoScheduler.java | 2 +-
.../dao/CapacitySchedulerLeafQueueInfo.java | 2 +-
...alCapacityPreemptionPolicyMockFramework.java | 8 +-
.../TestSchedulerApplicationAttempt.java | 14 +-
.../capacity/TestApplicationLimits.java | 15 +-
.../TestApplicationLimitsByPartition.java | 23 +-
.../TestCapacitySchedulerNodeLabelUpdate.java | 4 +-
.../scheduler/capacity/TestLeafQueue.java | 98 +-
.../TestNodeLabelContainerAllocation.java | 7 +-
24 files changed, 1303 insertions(+), 578 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index 757f567..5f1af1e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueueCandidatesSelector.TAPriorityComparator;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -264,8 +265,9 @@ public class FifoIntraQueuePreemptionPlugin
// Verify whether we already calculated headroom for this user.
if (userLimitResource == null) {
- userLimitResource = Resources.clone(tq.leafQueue
- .getUserLimitPerUser(userName, partitionBasedResource, partition));
+ userLimitResource = Resources.clone(
+ tq.leafQueue.getResourceLimitForAllUsers(userName, clusterResource,
+ partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
Resource amUsed = perUserAMUsed.get(userName);
if (null == amUsed) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
new file mode 100644
index 0000000..4db3584
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractUsersManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+
+/**
+ * {@link AbstractUsersManager} tracks users in the system.
+ */
+@Private
+public interface AbstractUsersManager {
+ /**
+ * An application has new outstanding requests.
+ *
+ * @param user
+ * application user
+ * @param applicationId
+ * activated application
+ */
+ void activateApplication(String user, ApplicationId applicationId);
+ /**
+ * An application has no more outstanding requests.
+ *
+ * @param user
+ * application user
+ * @param applicationId
+ * deactivated application
+ */
+ void deactivateApplication(String user, ApplicationId applicationId);
+
+ /**
+ * Get number of active users i.e. users with applications which have pending
+ * resource requests.
+ *
+ * @return number of active users
+ */
+ int getNumActiveUsers();
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
index 36e6858..049f324 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ActiveUsersManager.java
@@ -36,8 +36,8 @@ import org.apache.hadoop.yarn.server.utils.Lock;
* An active user is defined as someone with outstanding resource requests.
*/
@Private
-public class ActiveUsersManager {
-
+public class ActiveUsersManager implements AbstractUsersManager {
+
private static final Log LOG = LogFactory.getLog(ActiveUsersManager.class);
private final QueueMetrics metrics;
@@ -45,7 +45,7 @@ public class ActiveUsersManager {
private int activeUsers = 0;
private Map<String, Set<ApplicationId>> usersApplications =
new HashMap<String, Set<ApplicationId>>();
-
+
public ActiveUsersManager(QueueMetrics metrics) {
this.metrics = metrics;
}
@@ -57,6 +57,7 @@ public class ActiveUsersManager {
* @param applicationId activated application
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
+ @Override
synchronized public void activateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -65,8 +66,10 @@ public class ActiveUsersManager {
usersApplications.put(user, userApps);
++activeUsers;
metrics.incrActiveUsers();
- LOG.debug("User " + user + " added to activeUsers, currently: " +
- activeUsers);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User " + user + " added to activeUsers, currently: "
+ + activeUsers);
+ }
}
if (userApps.add(applicationId)) {
metrics.activateApp(user);
@@ -80,6 +83,7 @@ public class ActiveUsersManager {
* @param applicationId deactivated application
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
+ @Override
synchronized public void deactivateApplication(
String user, ApplicationId applicationId) {
Set<ApplicationId> userApps = usersApplications.get(user);
@@ -91,18 +95,21 @@ public class ActiveUsersManager {
usersApplications.remove(user);
--activeUsers;
metrics.decrActiveUsers();
- LOG.debug("User " + user + " removed from activeUsers, currently: " +
- activeUsers);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("User " + user + " removed from activeUsers, currently: "
+ + activeUsers);
+ }
}
}
}
-
+
/**
* Get number of active users i.e. users with applications which have pending
* resource requests.
* @return number of active users
*/
@Lock({Queue.class, SchedulerApplicationAttempt.class})
+ @Override
synchronized public int getNumActiveUsers() {
return activeUsers;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index f0b6e98..9532343 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -66,7 +66,7 @@ public class AppSchedulingInfo {
private final String user;
private Queue queue;
- private ActiveUsersManager activeUsersManager;
+ private AbstractUsersManager abstractUsersManager;
// whether accepted/allocated by scheduler
private volatile boolean pending = true;
private ResourceUsage appResourceUsage;
@@ -90,13 +90,13 @@ public class AppSchedulingInfo {
public final ContainerUpdateContext updateContext;
public AppSchedulingInfo(ApplicationAttemptId appAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, Queue queue, AbstractUsersManager abstractUsersManager,
long epoch, ResourceUsage appResourceUsage) {
this.applicationAttemptId = appAttemptId;
this.applicationId = appAttemptId.getApplicationId();
this.queue = queue;
this.user = user;
- this.activeUsersManager = activeUsersManager;
+ this.abstractUsersManager = abstractUsersManager;
this.containerIdCounter = new AtomicLong(
epoch << ResourceManager.EPOCH_BIT_SHIFT);
this.appResourceUsage = appResourceUsage;
@@ -253,7 +253,7 @@ public class AppSchedulingInfo {
// Activate application. Metrics activation is done here.
if (lastRequestContainers <= 0) {
schedulerKeys.add(schedulerKey);
- activeUsersManager.activateApplication(user, applicationId);
+ abstractUsersManager.activateApplication(user, applicationId);
}
}
@@ -453,7 +453,7 @@ public class AppSchedulingInfo {
public void checkForDeactivation() {
if (schedulerKeys.isEmpty()) {
- activeUsersManager.deactivateApplication(user, applicationId);
+ abstractUsersManager.deactivateApplication(user, applicationId);
}
}
@@ -483,9 +483,9 @@ public class AppSchedulingInfo {
}
oldMetrics.moveAppFrom(this);
newMetrics.moveAppTo(this);
- activeUsersManager.deactivateApplication(user, applicationId);
- activeUsersManager = newQueue.getActiveUsersManager();
- activeUsersManager.activateApplication(user, applicationId);
+ abstractUsersManager.deactivateApplication(user, applicationId);
+ abstractUsersManager = newQueue.getAbstractUsersManager();
+ abstractUsersManager.activateApplication(user, applicationId);
this.queue = newQueue;
} finally {
this.writeLock.unlock();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
index ada2a0b..d166e5f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Queue.java
@@ -63,7 +63,7 @@ public interface Queue {
boolean hasAccess(QueueACL acl, UserGroupInformation user);
- public ActiveUsersManager getActiveUsersManager();
+ public AbstractUsersManager getAbstractUsersManager();
/**
* Recover the state of the queue for a given container.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index ecfea18..e1d714d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -200,13 +200,13 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicInteger unconfirmedAllocatedVcores = new AtomicInteger();
public SchedulerApplicationAttempt(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, Queue queue, AbstractUsersManager abstractUsersManager,
RMContext rmContext) {
Preconditions.checkNotNull(rmContext, "RMContext should not be null");
this.rmContext = rmContext;
this.appSchedulingInfo =
new AppSchedulingInfo(applicationAttemptId, user, queue,
- activeUsersManager, rmContext.getEpoch(), attemptResourceUsage);
+ abstractUsersManager, rmContext.getEpoch(), attemptResourceUsage);
this.queue = queue;
this.pendingRelease = Collections.newSetFromMap(
new ConcurrentHashMap<ContainerId, Boolean>());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index b878e72..c6726ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@@ -240,10 +240,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
ResourceLimits resourceLimits);
/**
- * Get the {@link ActiveUsersManager} for the queue.
- * @return the <code>ActiveUsersManager</code> for the queue
+ * Get the {@link AbstractUsersManager} for the queue.
+ * @return the <code>AbstractUsersManager</code> for the queue
*/
- public ActiveUsersManager getActiveUsersManager();
+ public AbstractUsersManager getAbstractUsersManager();
/**
* Adds all applications in the queue and its subqueues to the given collection.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index 5605f18..140a2ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -26,12 +26,12 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class CapacityHeadroomProvider {
- LeafQueue.User user;
+ UsersManager.User user;
LeafQueue queue;
FiCaSchedulerApp application;
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
- public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
+ public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
FiCaSchedulerApp application,
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 8b1f8b4..54145dc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -745,7 +745,7 @@ public class CapacityScheduler extends
CSQueue queue = (CSQueue) application.getQueue();
FiCaSchedulerApp attempt = new FiCaSchedulerApp(applicationAttemptId,
- application.getUser(), queue, queue.getActiveUsersManager(),
+ application.getUser(), queue, queue.getAbstractUsersManager(),
rmContext, application.getPriority(), isAttemptRecovering,
activitiesManager);
if (transferStateFromPreviousAttempt) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 2b1efd6..c34adbe 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -51,7 +51,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsMana
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
@@ -63,6 +63,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
@@ -76,7 +77,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPo
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
@@ -101,8 +101,6 @@ public class LeafQueue extends AbstractCSQueue {
private static final Log LOG = LogFactory.getLog(LeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
- private volatile int userLimit;
- private volatile float userLimitFactor;
protected int maxApplications;
protected volatile int maxApplicationsPerUser;
@@ -122,14 +120,12 @@ public class LeafQueue extends AbstractCSQueue {
private volatile float minimumAllocationFactor;
- private Map<String, User> users = new ConcurrentHashMap<>();
-
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
private CapacitySchedulerContext scheduler;
- private final ActiveUsersManager activeUsersManager;
+ private final UsersManager usersManager;
// cache last cluster resource to compute actual capacity
private Resource lastClusterResource = Resources.none();
@@ -141,10 +137,6 @@ public class LeafQueue extends AbstractCSQueue {
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
- // Summation of consumed ratios for all users in queue
- private float totalUserConsumedRatio = 0;
- private UsageRatios qUsageRatios;
-
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@@ -159,13 +151,12 @@ public class LeafQueue extends AbstractCSQueue {
super(cs, queueName, parent, old);
this.scheduler = cs;
- this.activeUsersManager = new ActiveUsersManager(metrics);
+ this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
+ resourceCalculator);
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
- qUsageRatios = new UsageRatios();
-
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
+ ", fullname=" + getQueuePath());
@@ -197,8 +188,8 @@ public class LeafQueue extends AbstractCSQueue {
setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
- userLimit = conf.getUserLimit(getQueuePath());
- userLimitFactor = conf.getUserLimitFactor(getQueuePath());
+ usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
+ usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
@@ -212,7 +203,8 @@ public class LeafQueue extends AbstractCSQueue {
}
}
maxApplicationsPerUser = Math.min(maxApplications,
- (int) (maxApplications * (userLimit / 100.0f) * userLimitFactor));
+ (int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
+ * usersManager.getUserLimitFactor()));
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(
@@ -271,8 +263,9 @@ public class LeafQueue extends AbstractCSQueue {
+ queueCapacities.getAbsoluteMaximumCapacity()
+ " [= 1.0 maximumCapacity undefined, "
+ "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
- + "\n" + "userLimit = " + userLimit + " [= configuredUserLimit ]"
- + "\n" + "userLimitFactor = " + userLimitFactor
+ + "\n" + "userLimit = " + usersManager.getUserLimit()
+ + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
+ + usersManager.getUserLimitFactor()
+ " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
+ maxApplications
+ " [= configuredMaximumSystemApplicationsPerQueue or"
@@ -336,9 +329,17 @@ public class LeafQueue extends AbstractCSQueue {
return maxApplicationsPerUser;
}
+ /**
+ *
+ * @return UsersManager instance.
+ */
+ public UsersManager getUsersManager() {
+ return usersManager;
+ }
+
@Override
- public ActiveUsersManager getActiveUsersManager() {
- return activeUsersManager;
+ public AbstractUsersManager getAbstractUsersManager() {
+ return usersManager;
}
@Override
@@ -352,7 +353,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
@VisibleForTesting
void setUserLimit(int userLimit) {
- this.userLimit = userLimit;
+ usersManager.setUserLimit(userLimit);
+ usersManager.userLimitNeedsRecompute();
}
/**
@@ -361,7 +363,8 @@ public class LeafQueue extends AbstractCSQueue {
*/
@VisibleForTesting
void setUserLimitFactor(float userLimitFactor) {
- this.userLimitFactor = userLimitFactor;
+ usersManager.setUserLimitFactor(userLimitFactor);
+ usersManager.userLimitNeedsRecompute();
}
@Override
@@ -422,12 +425,12 @@ public class LeafQueue extends AbstractCSQueue {
@Private
public int getUserLimit() {
- return userLimit;
+ return usersManager.getUserLimit();
}
@Private
public float getUserLimitFactor() {
- return userLimitFactor;
+ return usersManager.getUserLimitFactor();
}
@Override
@@ -477,44 +480,7 @@ public class LeafQueue extends AbstractCSQueue {
@VisibleForTesting
public User getUser(String userName) {
- return users.get(userName);
- }
-
- // Get and add user if absent
- private User getUserAndAddIfAbsent(String userName) {
- try {
- writeLock.lock();
- User u = users.get(userName);
- if (null == u) {
- u = new User();
- users.put(userName, u);
- }
- return u;
- } finally {
- writeLock.unlock();
- }
- }
-
- /**
- * @return an ArrayList of UserInfo objects who are active in this queue
- */
- public ArrayList<UserInfo> getUsers() {
- try {
- readLock.lock();
- ArrayList<UserInfo> usersToReturn = new ArrayList<UserInfo>();
- for (Map.Entry<String, User> entry : users.entrySet()) {
- User user = entry.getValue();
- usersToReturn.add(
- new UserInfo(entry.getKey(), Resources.clone(user.getAllUsed()),
- user.getActiveApplications(), user.getPendingApplications(),
- Resources.clone(user.getConsumedAMResources()),
- Resources.clone(user.getUserResourceLimit()),
- user.getResourceUsage()));
- }
- return usersToReturn;
- } finally {
- readLock.unlock();
- }
+ return usersManager.getUser(userName);
}
@Private
@@ -575,7 +541,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
- User user = getUserAndAddIfAbsent(userName);
+ User user = usersManager.getUserAndAddIfAbsent(userName);
// Add the attempt to our data-structures
addApplicationAttempt(application, user);
@@ -632,7 +598,7 @@ public class LeafQueue extends AbstractCSQueue {
}
// Check submission limits for the user on this queue
- User user = getUserAndAddIfAbsent(userName);
+ User user = usersManager.getUserAndAddIfAbsent(userName);
if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName
@@ -682,19 +648,21 @@ public class LeafQueue extends AbstractCSQueue {
* the absolute queue capacity (per partition) instead of the max and is
* modified by the userlimit and the userlimit factor as is the userlimit
*/
- float effectiveUserLimit = Math.max(userLimit / 100.0f,
- 1.0f / Math.max(getActiveUsersManager().getNumActiveUsers(), 1));
+ float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
+ 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
- Resource queuePartitionResource = Resources.multiplyAndNormalizeUp(
- resourceCalculator,
- labelManager.getResourceByLabel(nodePartition, lastClusterResource),
- queueCapacities.getAbsoluteCapacity(nodePartition),
- minimumAllocation);
+ Resource queuePartitionResource = Resources
+ .multiplyAndNormalizeUp(resourceCalculator,
+ labelManager.getResourceByLabel(nodePartition,
+ lastClusterResource),
+ queueCapacities.getAbsoluteCapacity(nodePartition),
+ minimumAllocation);
Resource userAMLimit = Resources.multiplyAndNormalizeUp(
resourceCalculator, queuePartitionResource,
queueCapacities.getMaxAMResourcePercentage(nodePartition)
- * effectiveUserLimit * userLimitFactor, minimumAllocation);
+ * effectiveUserLimit * usersManager.getUserLimitFactor(),
+ minimumAllocation);
return Resources.lessThanOrEqual(resourceCalculator, lastClusterResource,
userAMLimit, getAMResourceLimitPerPartition(nodePartition)) ?
userAMLimit :
@@ -910,7 +878,7 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void finishApplication(ApplicationId application, String user) {
// Inform the activeUsersManager
- activeUsersManager.deactivateApplication(user, application);
+ usersManager.deactivateApplication(user, application);
appFinished();
@@ -932,7 +900,7 @@ public class LeafQueue extends AbstractCSQueue {
// TODO, should use getUser, use this method just to avoid UT failure
// which is caused by wrong invoking order, will fix UT separately
- User user = getUserAndAddIfAbsent(userName);
+ User user = usersManager.getUserAndAddIfAbsent(userName);
String partitionName = application.getAppAMNodePartitionName();
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
@@ -950,7 +918,7 @@ public class LeafQueue extends AbstractCSQueue {
user.finishApplication(wasActive);
if (user.getTotalApplications() == 0) {
- users.remove(application.getUser());
+ usersManager.removeUser(application.getUser());
}
// Check if we can activate more applications
@@ -1291,7 +1259,7 @@ public class LeafQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
- computeUserLimit(application.getUser(), clusterResource, user,
+ getResourceLimitForActiveUsers(application.getUser(), clusterResource,
partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
partition);
}
@@ -1365,7 +1333,7 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
Resource userLimit =
- computeUserLimit(application.getUser(), clusterResource, queueUser,
+ getResourceLimitForActiveUsers(application.getUser(), clusterResource,
nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@@ -1375,11 +1343,11 @@ public class LeafQueue extends AbstractCSQueue {
clusterResource, userLimit, nodePartition);
if (LOG.isDebugEnabled()) {
- LOG.debug("Headroom calculation for user " + user + ": " +
- " userLimit=" + userLimit +
- " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() +
- " consumed=" + queueUser.getUsed() +
- " headroom=" + headroom);
+ LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
+ + userLimit + " queueMaxAvailRes="
+ + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+ + queueUser.getUsed() + " headroom=" + headroom + " partition="
+ + nodePartition);
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
@@ -1407,129 +1375,46 @@ public class LeafQueue extends AbstractCSQueue {
return rackLocalityFullReset;
}
- @Lock(NoLock.class)
- private Resource computeUserLimit(String userName,
- Resource clusterResource, User user,
- String nodePartition, SchedulingMode schedulingMode) {
- Resource partitionResource = labelManager.getResourceByLabel(nodePartition,
- clusterResource);
-
- // What is our current capacity?
- // * It is equal to the max(required, queue-capacity) if
- // we're running below capacity. The 'max' ensures that jobs in queues
- // with miniscule capacity (< 1 slot) make progress
- // * If we're running over capacity, then its
- // (usedResources + required) (which extra resources we are allocating)
- Resource queueCapacity =
- Resources.multiplyAndNormalizeUp(resourceCalculator,
- partitionResource,
- queueCapacities.getAbsoluteCapacity(nodePartition),
- minimumAllocation);
-
- // Assume we have required resource equals to minimumAllocation, this can
- // make sure user limit can continuously increase till queueMaxResource
- // reached.
- Resource required = minimumAllocation;
-
- // Allow progress for queues with miniscule capacity
- queueCapacity =
- Resources.max(
- resourceCalculator, partitionResource,
- queueCapacity,
- required);
-
-
- /* We want to base the userLimit calculation on
- * max(queueCapacity, usedResources+required). However, we want
- * usedResources to be based on the combined ratios of all the users in the
- * queue so we use consumedRatio to calculate such.
- * The calculation is dependent on how the resourceCalculator calculates the
- * ratio between two Resources. DRF Example: If usedResources is
- * greater than queueCapacity and users have the following [mem,cpu] usages:
- * User1: [10%,20%] - Dominant resource is 20%
- * User2: [30%,10%] - Dominant resource is 30%
- * Then total consumedRatio is then 20+30=50%. Yes, this value can be
- * larger than 100% but for the purposes of making sure all users are
- * getting their fair share, it works.
- */
- Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
- partitionResource, qUsageRatios.getUsageRatio(nodePartition),
- minimumAllocation);
- Resource currentCapacity =
- Resources.lessThan(resourceCalculator, partitionResource, consumed,
- queueCapacity) ? queueCapacity : Resources.add(consumed, required);
- // Never allow a single user to take more than the
- // queue's configured capacity * user-limit-factor.
- // Also, the queue's configured capacity should be higher than
- // queue-hard-limit * ulMin
-
- final int activeUsers = activeUsersManager.getNumActiveUsers();
-
- // User limit resource is determined by:
- // max{currentCapacity / #activeUsers, currentCapacity *
- // user-limit-percentage%)
- Resource userLimitResource = Resources.max(
- resourceCalculator, partitionResource,
- Resources.divideAndCeil(
- resourceCalculator, currentCapacity, activeUsers),
- Resources.divideAndCeil(
- resourceCalculator,
- Resources.multiplyAndRoundDown(
- currentCapacity, userLimit),
- 100)
- );
-
- // User limit is capped by maxUserLimit
- // - maxUserLimit = queueCapacity * user-limit-factor (RESPECT_PARTITION_EXCLUSIVITY)
- // - maxUserLimit = total-partition-resource (IGNORE_PARTITION_EXCLUSIVITY)
- //
- // In IGNORE_PARTITION_EXCLUSIVITY mode, if a queue cannot access a
- // partition, its guaranteed resource on that partition is 0. And
- // user-limit-factor computation is based on queue's guaranteed capacity. So
- // we will not cap user-limit as well as used resource when doing
- // IGNORE_PARTITION_EXCLUSIVITY allocation.
- Resource maxUserLimit = Resources.none();
- if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
- maxUserLimit =
- Resources.multiplyAndRoundDown(queueCapacity, userLimitFactor);
- } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
- maxUserLimit = partitionResource;
- }
-
- // Cap final user limit with maxUserLimit
- userLimitResource =
- Resources.roundUp(
- resourceCalculator,
- Resources.min(
- resourceCalculator, partitionResource,
- userLimitResource,
- maxUserLimit
- ),
- minimumAllocation);
+ /**
+ *
+ * @param userName
+ * Name of user who has submitted one/more app to given queue.
+ * @param clusterResource
+ * total cluster resource
+ * @param nodePartition
+ * partition name
+ * @param schedulingMode
+ * scheduling mode
+ * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+ * @return Computed User Limit
+ */
+ public Resource getResourceLimitForActiveUsers(String userName,
+ Resource clusterResource, String nodePartition,
+ SchedulingMode schedulingMode) {
+ return usersManager.getComputedResourceLimitForActiveUsers(userName,
+ clusterResource, nodePartition, schedulingMode);
+ }
- if (LOG.isDebugEnabled()) {
- LOG.debug("User limit computation for " + userName +
- " in queue " + getQueueName() +
- " userLimitPercent=" + userLimit +
- " userLimitFactor=" + userLimitFactor +
- " required: " + required +
- " consumed: " + consumed +
- " user-limit-resource: " + userLimitResource +
- " queueCapacity: " + queueCapacity +
- " qconsumed: " + queueUsage.getUsed() +
- " consumedRatio: " + totalUserConsumedRatio +
- " currentCapacity: " + currentCapacity +
- " activeUsers: " + activeUsers +
- " clusterCapacity: " + clusterResource +
- " resourceByLabel: " + partitionResource +
- " usageratio: " + qUsageRatios.getUsageRatio(nodePartition) +
- " Partition: " + nodePartition
- );
- }
- user.setUserResourceLimit(userLimitResource);
- return userLimitResource;
+ /**
+ *
+ * @param userName
+ * Name of user who has submitted one/more app to given queue.
+ * @param clusterResource
+ * total cluster resource
+ * @param nodePartition
+ * partition name
+ * @param schedulingMode
+ * scheduling mode
+ * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
+ * @return Computed User Limit
+ */
+ public Resource getResourceLimitForAllUsers(String userName,
+ Resource clusterResource, String nodePartition,
+ SchedulingMode schedulingMode) {
+ return usersManager.getComputedResourceLimitForAllUsers(userName,
+ clusterResource, nodePartition, schedulingMode);
}
-
+
@Private
protected boolean canAssignToUser(Resource clusterResource,
String userName, Resource limit, FiCaSchedulerApp application,
@@ -1600,52 +1485,34 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- private float calculateUserUsageRatio(Resource clusterResource,
- String nodePartition) {
- try {
- writeLock.lock();
- Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
- clusterResource);
- float consumed = 0;
- User user;
- for (Map.Entry<String, User> entry : users.entrySet()) {
- user = entry.getValue();
- consumed += user.resetAndUpdateUsageRatio(resourceCalculator,
- resourceByLabel, nodePartition);
- }
- return consumed;
- } finally {
- writeLock.unlock();
- }
- }
-
- private void recalculateQueueUsageRatio(Resource clusterResource,
+ /**
+ * Recalculate QueueUsage Ratio.
+ *
+ * @param clusterResource
+ * Total Cluster Resource
+ * @param nodePartition
+ * Partition
+ */
+ public void recalculateQueueUsageRatio(Resource clusterResource,
String nodePartition) {
try {
writeLock.lock();
- ResourceUsage queueResourceUsage = this.getQueueResourceUsage();
+ ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
for (String partition : Sets.union(
- queueCapacities.getNodePartitionsSet(),
+ getQueueCapacities().getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
- qUsageRatios.setUsageRatio(partition,
- calculateUserUsageRatio(clusterResource, partition));
+ usersManager.updateUsageRatio(partition, clusterResource);
}
- } else{
- qUsageRatios.setUsageRatio(nodePartition,
- calculateUserUsageRatio(clusterResource, nodePartition));
+ } else {
+ usersManager.updateUsageRatio(nodePartition, clusterResource);
}
} finally {
writeLock.unlock();
}
}
- private void updateQueueUsageRatio(String nodePartition,
- float delta) {
- qUsageRatios.incUsageRatio(nodePartition, delta);
- }
-
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
@@ -1708,8 +1575,6 @@ public class LeafQueue extends AbstractCSQueue {
try {
writeLock.lock();
super.allocateResource(clusterResource, resource, nodePartition);
- Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
- clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1728,16 +1593,9 @@ public class LeafQueue extends AbstractCSQueue {
// Update user metrics
String userName = application.getUser();
- // TODO, should use getUser, use this method just to avoid UT failure
- // which is caused by wrong invoking order, will fix UT separately
- User user = getUserAndAddIfAbsent(userName);
-
- user.assignContainer(resource, nodePartition);
-
- // Update usage ratios
- updateQueueUsageRatio(nodePartition,
- user.updateUsageRatio(resourceCalculator, resourceByLabel,
- nodePartition));
+ // Increment user's resource usage.
+ User user = usersManager.updateUserResourceUsage(userName, resource,
+ nodePartition, true);
// Note this is a bit unconventional since it gets the object and modifies
// it here, rather then using set routine
@@ -1746,9 +1604,10 @@ public class LeafQueue extends AbstractCSQueue {
userName, application.getHeadroom());
if (LOG.isDebugEnabled()) {
- LOG.debug(getQueueName() + " user=" + userName + " used=" + queueUsage
- .getUsed() + " numContainers=" + numContainers + " headroom = "
- + application.getHeadroom() + " user-resources=" + user.getUsed());
+ LOG.debug(getQueueName() + " user=" + userName + " used="
+ + queueUsage.getUsed(nodePartition) + " numContainers="
+ + numContainers + " headroom = " + application.getHeadroom()
+ + " user-resources=" + user.getUsed());
}
} finally {
writeLock.unlock();
@@ -1761,8 +1620,6 @@ public class LeafQueue extends AbstractCSQueue {
try {
writeLock.lock();
super.releaseResource(clusterResource, resource, nodePartition);
- Resource resourceByLabel = labelManager.getResourceByLabel(nodePartition,
- clusterResource);
// handle ignore exclusivity container
if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
@@ -1780,13 +1637,8 @@ public class LeafQueue extends AbstractCSQueue {
// Update user metrics
String userName = application.getUser();
- User user = getUserAndAddIfAbsent(userName);
- user.releaseContainer(resource, nodePartition);
-
- // Update usage ratios
- updateQueueUsageRatio(nodePartition,
- user.updateUsageRatio(resourceCalculator, resourceByLabel,
- nodePartition));
+ User user = usersManager.updateUserResourceUsage(userName, resource,
+ nodePartition, false);
metrics.setAvailableResourcesToUser(nodePartition,
userName, application.getHeadroom());
@@ -1846,6 +1698,10 @@ public class LeafQueue extends AbstractCSQueue {
// activate the pending applications if possible
activateApplications();
+ // In case of any resource change, invalidate recalculateULCount to clear
+ // the computed user-limit.
+ usersManager.userLimitNeedsRecompute();
+
// Update application properties
for (FiCaSchedulerApp application : orderingPolicy
.getSchedulableEntities()) {
@@ -1861,16 +1717,16 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
- getUser(application.getUser()).getResourceUsage().incUsed(nodeLabel,
- resourceToInc);
+ usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
+ nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
- getUser(application.getUser()).getResourceUsage().decUsed(nodeLabel,
- resourceToDec);
+ usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
+ nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
@@ -1890,191 +1746,6 @@ public class LeafQueue extends AbstractCSQueue {
queueUsage.decAMUsed(nodeLabel, resourceToDec);
}
- /*
- * Usage Ratio
- */
- static private class UsageRatios {
- private Map<String, Float> usageRatios;
- private ReadLock readLock;
- private WriteLock writeLock;
-
- public UsageRatios() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- readLock = lock.readLock();
- writeLock = lock.writeLock();
- usageRatios = new HashMap<String, Float>();
- }
-
- private void incUsageRatio(String label, float delta) {
- try {
- writeLock.lock();
- Float fl = usageRatios.get(label);
- if (null == fl) {
- fl = new Float(0.0);
- }
- fl += delta;
- usageRatios.put(label, new Float(fl));
- } finally {
- writeLock.unlock();
- }
- }
-
- float getUsageRatio(String label) {
- try {
- readLock.lock();
- Float f = usageRatios.get(label);
- if (null == f) {
- return 0.0f;
- }
- return f;
- } finally {
- readLock.unlock();
- }
- }
-
- private void setUsageRatio(String label, float ratio) {
- try {
- writeLock.lock();
- usageRatios.put(label, new Float(ratio));
- } finally {
- writeLock.unlock();
- }
- }
- }
-
- @VisibleForTesting
- public float getUsageRatio(String label) {
- return qUsageRatios.getUsageRatio(label);
- }
-
- @VisibleForTesting
- public static class User {
- ResourceUsage userResourceUsage = new ResourceUsage();
- volatile Resource userResourceLimit = Resource.newInstance(0, 0);
- volatile int pendingApplications = 0;
- volatile int activeApplications = 0;
- private UsageRatios userUsageRatios = new UsageRatios();
- private WriteLock writeLock;
-
- User() {
- ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
- // Nobody uses read-lock now, will add it when necessary
- writeLock = lock.writeLock();
- }
-
- public ResourceUsage getResourceUsage() {
- return userResourceUsage;
- }
-
- public float resetAndUpdateUsageRatio(
- ResourceCalculator resourceCalculator,
- Resource resource, String nodePartition) {
- try {
- writeLock.lock();
- userUsageRatios.setUsageRatio(nodePartition, 0);
- return updateUsageRatio(resourceCalculator, resource, nodePartition);
- } finally {
- writeLock.unlock();
- }
- }
-
- public float updateUsageRatio(
- ResourceCalculator resourceCalculator,
- Resource resource, String nodePartition) {
- try {
- writeLock.lock();
- float delta;
- float newRatio = Resources.ratio(resourceCalculator,
- getUsed(nodePartition), resource);
- delta = newRatio - userUsageRatios.getUsageRatio(nodePartition);
- userUsageRatios.setUsageRatio(nodePartition, newRatio);
- return delta;
- } finally {
- writeLock.unlock();
- }
- }
-
- public Resource getUsed() {
- return userResourceUsage.getUsed();
- }
-
- public Resource getAllUsed() {
- return userResourceUsage.getAllUsed();
- }
-
- public Resource getUsed(String label) {
- return userResourceUsage.getUsed(label);
- }
-
- public int getPendingApplications() {
- return pendingApplications;
- }
-
- public int getActiveApplications() {
- return activeApplications;
- }
-
- public Resource getConsumedAMResources() {
- return userResourceUsage.getAMUsed();
- }
-
- public Resource getConsumedAMResources(String label) {
- return userResourceUsage.getAMUsed(label);
- }
-
- public int getTotalApplications() {
- return getPendingApplications() + getActiveApplications();
- }
-
- public void submitApplication() {
- try {
- writeLock.lock();
- ++pendingApplications;
- } finally {
- writeLock.unlock();
- }
- }
-
- public void activateApplication() {
- try {
- writeLock.lock();
- --pendingApplications;
- ++activeApplications;
- } finally {
- writeLock.unlock();
- }
- }
-
- public void finishApplication(boolean wasActive) {
- try {
- writeLock.lock();
- if (wasActive) {
- --activeApplications;
- } else{
- --pendingApplications;
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public void assignContainer(Resource resource, String nodePartition) {
- userResourceUsage.incUsed(nodePartition, resource);
- }
-
- public void releaseContainer(Resource resource, String nodePartition) {
- userResourceUsage.decUsed(nodePartition, resource);
- }
-
- public Resource getUserResourceLimit() {
- return userResourceLimit;
- }
-
- public void setUserResourceLimit(Resource userResourceLimit) {
- this.userResourceLimit = userResourceLimit;
- }
- }
-
@Override
public void recoverContainer(Resource clusterResource,
SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
@@ -2144,9 +1815,9 @@ public class LeafQueue extends AbstractCSQueue {
* excessive preemption.
* @return Total pending resource considering user limit
*/
-
public Resource getTotalPendingResourcesConsideringUserLimit(
- Resource clusterResources, String partition, boolean deductReservedFromPending) {
+ Resource clusterResources, String partition,
+ boolean deductReservedFromPending) {
try {
readLock.lock();
Map<String, Resource> userNameToHeadroom =
@@ -2157,8 +1828,8 @@ public class LeafQueue extends AbstractCSQueue {
if (!userNameToHeadroom.containsKey(userName)) {
User user = getUser(userName);
Resource headroom = Resources.subtract(
- computeUserLimit(app.getUser(), clusterResources, user, partition,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
+ getResourceLimitForActiveUsers(app.getUser(), clusterResources,
+ partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
user.getUsed(partition));
// Make sure headroom is not negative.
headroom = Resources.componentwiseMax(headroom, Resources.none());
@@ -2188,16 +1859,6 @@ public class LeafQueue extends AbstractCSQueue {
}
- public synchronized Resource getUserLimitPerUser(String userName,
- Resource resources, String partition) {
-
- // Check user resource limit
- User user = getUser(userName);
-
- return computeUserLimit(userName, resources, user, partition,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
- }
-
@Override
public void collectSchedulerApplications(
Collection<ApplicationAttemptId> apps) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/f2d440b3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 0206211..a9ccefc 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -884,7 +884,7 @@ public class ParentQueue extends AbstractCSQueue {
}
@Override
- public ActiveUsersManager getActiveUsersManager() {
+ public ActiveUsersManager getAbstractUsersManager() {
// Should never be called since all applications are submitted to LeafQueues
return null;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org