You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by st...@apache.org on 2017/06/26 12:50:15 UTC

[41/50] hadoop git commit: YARN-5892. Support user-specific minimum user limit percentage in Capacity Scheduler. Contributed by Eric Payne.

YARN-5892. Support user-specific minimum user limit percentage in Capacity Scheduler. Contributed by Eric Payne.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/ca13b224
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/ca13b224
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/ca13b224

Branch: refs/heads/HADOOP-13345
Commit: ca13b224b2feb9c44de861da9cbba8dd2a12cb35
Parents: 68dc7c2
Author: Sunil G <su...@apache.org>
Authored: Thu Jun 22 23:50:57 2017 -0700
Committer: Sunil G <su...@apache.org>
Committed: Thu Jun 22 23:50:57 2017 -0700

----------------------------------------------------------------------
 .../resource/DefaultResourceCalculator.java     |   6 +
 .../resource/DominantResourceCalculator.java    |   8 +
 .../yarn/util/resource/ResourceCalculator.java  |  23 +++
 .../hadoop/yarn/util/resource/Resources.java    |   5 +
 .../scheduler/capacity/AbstractCSQueue.java     |  23 +++
 .../scheduler/capacity/CSQueue.java             |   7 +
 .../CapacitySchedulerConfiguration.java         |  34 ++++
 .../scheduler/capacity/LeafQueue.java           |  28 ++-
 .../scheduler/capacity/UserInfo.java            |  15 +-
 .../scheduler/capacity/UsersManager.java        | 172 +++++++++++++++----
 .../scheduler/common/fica/FiCaSchedulerApp.java |   4 +-
 .../webapp/CapacitySchedulerPage.java           |   9 +-
 .../scheduler/capacity/TestLeafQueue.java       | 123 +++++++++++++
 .../src/site/markdown/CapacityScheduler.md      |   1 +
 14 files changed, 415 insertions(+), 43 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
index 524a049..bdf60bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DefaultResourceCalculator.java
@@ -67,6 +67,12 @@ public class DefaultResourceCalculator extends ResourceCalculator {
   }
 
   @Override
+  public Resource divideAndCeil(Resource numerator, float denominator) {
+    return Resources.createResource(
+        divideAndCeil(numerator.getMemorySize(), denominator));
+  }
+
+  @Override
   public Resource normalize(Resource r, Resource minimumResource,
       Resource maximumResource, Resource stepFactor) {
     if (stepFactor.getMemorySize() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
index 69fe716..ea9b927 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/DominantResourceCalculator.java
@@ -155,6 +155,14 @@ public class DominantResourceCalculator extends ResourceCalculator {
   }
 
   @Override
+  public Resource divideAndCeil(Resource numerator, float denominator) {
+    return Resources.createResource(
+        divideAndCeil(numerator.getMemorySize(), denominator),
+        divideAndCeil(numerator.getVirtualCores(), denominator)
+        );
+  }
+
+  @Override
   public Resource normalize(Resource r, Resource minimumResource,
                             Resource maximumResource, Resource stepFactor) {
     if (stepFactor.getMemorySize() == 0 || stepFactor.getVirtualCores() == 0) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
index d219fe1..398dac5 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/ResourceCalculator.java
@@ -65,6 +65,13 @@ public abstract class ResourceCalculator {
     }
     return (a + (b - 1)) / b;
   }
+
+  public static int divideAndCeil(int a, float b) {
+    if (b == 0) {
+      return 0;
+    }
+    return (int) Math.ceil(a / b);
+  }
   
   public static long divideAndCeil(long a, long b) {
     if (b == 0) {
@@ -73,6 +80,13 @@ public abstract class ResourceCalculator {
     return (a + (b - 1)) / b;
   }
 
+  public static long divideAndCeil(long a, float b) {
+    if (b == 0) {
+      return 0;
+    }
+    return (long) Math.ceil(a/b);
+  }
+
   public static int roundUp(int a, int b) {
     return divideAndCeil(a, b) * b;
   }
@@ -198,6 +212,15 @@ public abstract class ResourceCalculator {
    * @return resultant resource
    */
   public abstract Resource divideAndCeil(Resource numerator, int denominator);
+
+  /**
+   * Divide-and-ceil <code>numerator</code> by <code>denominator</code>.
+   *
+   * @param numerator numerator resource
+   * @param denominator denominator
+   * @return resultant resource
+   */
+  public abstract Resource divideAndCeil(Resource numerator, float denominator);
   
   /**
    * Check if a smaller resource can be contained by bigger resource.

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
index b2bb099..a1d14fd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/util/resource/Resources.java
@@ -279,6 +279,11 @@ public class Resources {
       ResourceCalculator resourceCalculator, Resource lhs, int rhs) {
     return resourceCalculator.divideAndCeil(lhs, rhs);
   }
+
+  public static Resource divideAndCeil(
+      ResourceCalculator resourceCalculator, Resource lhs, float rhs) {
+    return resourceCalculator.divideAndCeil(lhs, rhs);
+  }
   
   public static boolean equals(Resource lhs, Resource rhs) {
     return lhs.equals(rhs);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 6c141a1..b69ec96 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -111,6 +111,7 @@ public abstract class AbstractCSQueue implements CSQueue {
   protected ReentrantReadWriteLock.WriteLock writeLock;
 
   volatile Priority priority = Priority.newInstance(0);
+  private Map<String, Float> userWeights = new HashMap<String, Float>();
 
   public AbstractCSQueue(CapacitySchedulerContext cs,
       String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -332,11 +333,28 @@ public abstract class AbstractCSQueue implements CSQueue {
 
       this.priority = csContext.getConfiguration().getQueuePriority(
           getQueuePath());
+
+      this.userWeights = getUserWeightsFromHierarchy();
     } finally {
       writeLock.unlock();
     }
   }
 
+  private Map<String, Float> getUserWeightsFromHierarchy() throws IOException {
+    Map<String, Float> unionInheritedWeights = new HashMap<String, Float>();
+    CSQueue parentQ = getParent();
+    if (parentQ != null) {
+      // Inherit all of parent's user's weights
+      unionInheritedWeights.putAll(parentQ.getUserWeights());
+    }
+    // Insert this queue's user's weights, overriding parent's user's weights if
+    // there is overlap.
+    CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
+    unionInheritedWeights.putAll(
+        csConf.getAllUserWeightsForQueue(getQueuePath()));
+    return unionInheritedWeights;
+  }
+
   private void initializeQueueState(QueueState previousState,
       QueueState configuredState, QueueState parentState) {
     // verify that we can not any value for State other than RUNNING/STOPPED
@@ -956,4 +974,9 @@ public abstract class AbstractCSQueue implements CSQueue {
   public Priority getPriority() {
     return this.priority;
   }
+
+  @Override
+  public Map<String, Float> getUserWeights() {
+    return userWeights;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index c6726ec..3a17d1b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 import java.io.IOException;
 import java.util.Collection;
 import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
@@ -350,4 +351,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
    * @return queue priority
    */
   Priority getPriority();
+
+  /**
+   * Get a map of usernames and weights
+   * @return map of usernames and corresponding weight
+   */
+  Map<String, Float> getUserWeights();
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index c3c9585..90a7e65 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -108,6 +108,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
   public static final String USER_LIMIT_FACTOR = "user-limit-factor";
 
   @Private
+  public static final String USER_WEIGHT = "weight";
+
+  @Private
+  public static final String USER_SETTINGS = "user-settings";
+
+  @Private
+  public static final float DEFAULT_USER_WEIGHT = 1.0f;
+
+  @Private
   public static final String STATE = "state";
   
   @Private
@@ -1412,4 +1421,29 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
         QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY,
         UNDER_UTILIZED_PREEMPTION_MOVE_RESERVATION), allowMoveReservation);
   }
+
+  /**
+   * Get the weights of all users at this queue level from the configuration.
+   * Used in computing user-specific user limit, relative to other users.
+   * @param queuePath full queue path
+   * @return map of user weights, if they exists. Otherwise, return empty map.
+   */
+  public Map<String, Float> getAllUserWeightsForQueue(String queuePath) {
+    Map <String, Float> userWeights = new HashMap <String, Float>();
+    String qPathPlusPrefix =
+        getQueuePrefix(queuePath).replaceAll("\\.", "\\\\.")
+        + USER_SETTINGS + "\\.";
+    String weightKeyRegex =
+        qPathPlusPrefix + "\\w+\\." + USER_WEIGHT;
+    Map<String, String> props = getValByRegex(weightKeyRegex);
+    for (Entry<String, String> e : props.entrySet()) {
+      String userName =
+          e.getKey().replaceFirst(qPathPlusPrefix, "")
+          .replaceFirst("\\." + USER_WEIGHT, "");
+      if (userName != null && !userName.isEmpty()) {
+        userWeights.put(userName, new Float(e.getValue()));
+      }
+    }
+    return userWeights;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index eb2432e..013a5ac 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
 
 import java.io.IOException;
 import java.util.*;
+import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
 
 import org.apache.commons.lang.StringUtils;
@@ -237,6 +238,20 @@ public class LeafQueue extends AbstractCSQueue {
       defaultAppPriorityPerQueue = Priority.newInstance(
           conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
 
+      // Validate leaf queue's user's weights.
+      int queueUL = Math.min(100, conf.getUserLimit(getQueuePath()));
+      for (Entry<String, Float> e : getUserWeights().entrySet()) {
+        float val = e.getValue().floatValue();
+        if (val < 0.0f || val > (100.0f / queueUL)) {
+          throw new IOException("Weight (" + val + ") for user \"" + e.getKey()
+              + "\" must be between 0 and" + " 100 / " + queueUL + " (= " +
+              100.0f/queueUL + ", the number of concurrent active users in "
+              + getQueuePath() + ")");
+        }
+      }
+
+      usersManager.updateUserWeights();
+
       LOG.info(
           "Initializing " + queueName + "\n" + "capacity = " + queueCapacities
               .getCapacity() + " [= (float) configuredCapacity / 100 ]" + "\n"
@@ -619,11 +634,16 @@ public class LeafQueue extends AbstractCSQueue {
 
   @VisibleForTesting
   public Resource getUserAMResourceLimit() {
-     return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL);
+    return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
+         null);
   }
 
   public Resource getUserAMResourceLimitPerPartition(
-      String nodePartition) {
+      String nodePartition, String userName) {
+    float userWeight = 1.0f;
+    if (userName != null && getUser(userName) != null) {
+      userWeight = getUser(userName).getWeight();
+    }
     try {
       readLock.lock();
       /*
@@ -634,6 +654,7 @@ public class LeafQueue extends AbstractCSQueue {
        */
       float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
           1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
+      effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
 
       Resource queuePartitionResource = Resources
           .multiplyAndNormalizeUp(resourceCalculator,
@@ -774,7 +795,8 @@ public class LeafQueue extends AbstractCSQueue {
 
         // Verify whether we already calculated user-am-limit for this label.
         if (userAMLimit == null) {
-          userAMLimit = getUserAMResourceLimitPerPartition(partitionName);
+          userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
+              application.getUser());
           userAmPartitionLimit.put(partitionName, userAMLimit);
         }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
index ff9d304..a1a8ecf 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UserInfo.java
@@ -37,11 +37,14 @@ public class UserInfo {
   protected ResourceInfo AMResourceUsed;
   protected ResourceInfo userResourceLimit;
   protected ResourcesInfo resources;
+  private float userWeight;
+  private boolean isActive;
 
   UserInfo() {}
 
   UserInfo(String username, Resource resUsed, int activeApps, int pendingApps,
-      Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage) {
+      Resource amResUsed, Resource resourceLimit, ResourceUsage resourceUsage,
+      float weight, boolean isActive) {
     this.username = username;
     this.resourcesUsed = new ResourceInfo(resUsed);
     this.numActiveApplications = activeApps;
@@ -49,6 +52,8 @@ public class UserInfo {
     this.AMResourceUsed = new ResourceInfo(amResUsed);
     this.userResourceLimit = new ResourceInfo(resourceLimit);
     this.resources = new ResourcesInfo(resourceUsage);
+    this.userWeight = weight;
+    this.isActive = isActive;
   }
 
   public String getUsername() {
@@ -78,4 +83,12 @@ public class UserInfo {
   public ResourcesInfo getResourceUsageInfo() {
     return resources;
   }
+
+  public float getUserWeight() {
+    return userWeight;
+  }
+
+  public boolean getIsActive() {
+    return isActive;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 579c4c7..5f7d185 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -92,6 +92,9 @@ public class UsersManager implements AbstractUsersManager {
   Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
   Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
 
+  private float activeUsersTimesWeights = 0.0f;
+  private float allUsersTimesWeights = 0.0f;
+
   /**
    * UsageRatios will store the total used resources ratio across all users of
    * the queue.
@@ -158,6 +161,7 @@ public class UsersManager implements AbstractUsersManager {
 
     private UsageRatios userUsageRatios = new UsageRatios();
     private WriteLock writeLock;
+    private float weight;
 
     public User(String name) {
       ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
@@ -262,6 +266,20 @@ public class UsersManager implements AbstractUsersManager {
     public void setResourceUsage(ResourceUsage resourceUsage) {
       this.userResourceUsage = resourceUsage;
     }
+
+    /**
+     * @return the weight
+     */
+    public float getWeight() {
+      return weight;
+    }
+
+    /**
+     * @param weight the weight to set
+     */
+    public void setWeight(float weight) {
+      this.weight = weight;
+    }
   } /* End of User class */
 
   /**
@@ -382,6 +400,8 @@ public class UsersManager implements AbstractUsersManager {
       // Remove user from active/non-active list as well.
       activeUsersSet.remove(userName);
       nonActiveUsersSet.remove(userName);
+      activeUsersTimesWeights = sumActiveUsersTimesWeights();
+      allUsersTimesWeights = sumAllUsersTimesWeights();
     } finally {
       writeLock.unlock();
     }
@@ -418,6 +438,8 @@ public class UsersManager implements AbstractUsersManager {
    */
   private void addUser(String userName, User user) {
     this.users.put(userName, user);
+    user.setWeight(getUserWeightFromQueue(userName));
+    allUsersTimesWeights = sumAllUsersTimesWeights();
   }
 
   /**
@@ -434,7 +456,8 @@ public class UsersManager implements AbstractUsersManager {
                 user.getActiveApplications(), user.getPendingApplications(),
                 Resources.clone(user.getConsumedAMResources()),
                 Resources.clone(user.getUserResourceLimit()),
-                user.getResourceUsage()));
+                user.getResourceUsage(), user.getWeight(),
+                activeUsersSet.contains(user.userName)));
       }
       return usersToReturn;
     } finally {
@@ -442,6 +465,11 @@ public class UsersManager implements AbstractUsersManager {
     }
   }
 
+  private float getUserWeightFromQueue(String userName) {
+    Float weight = lQueue.getUserWeights().get(userName);
+    return (weight == null) ? 1.0f : weight.floatValue();
+  }
+
   /**
    * Get computed user-limit for all ACTIVE users in this queue. If cached data
    * is invalidated due to resource change, this method also enforce to
@@ -480,13 +508,24 @@ public class UsersManager implements AbstractUsersManager {
       writeLock.unlock();
     }
 
-    if (LOG.isDebugEnabled()) {
-      LOG.debug("userLimit is fetched. userLimit = "
-          + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
-          + schedulingMode + ", partition=" + nodePartition);
+    Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
+    User user = getUser(userName);
+    float weight = (user == null) ? 1.0f : user.getWeight();
+    Resource userSpecificUserLimit =
+        Resources.multiplyAndNormalizeDown(resourceCalculator,
+            userLimitResource, weight, lQueue.getMinimumAllocation());
+
+    if (user != null) {
+      user.setUserResourceLimit(userSpecificUserLimit);
     }
 
-    return userLimitPerSchedulingMode.get(schedulingMode);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
+          + ", userSpecificUserLimit=" + userSpecificUserLimit
+          + ", schedulingMode=" + schedulingMode
+          + ", partition=" + nodePartition);
+    }
+    return userSpecificUserLimit;
   }
 
   /**
@@ -527,13 +566,21 @@ public class UsersManager implements AbstractUsersManager {
       writeLock.unlock();
     }
 
+    Resource userLimitResource = userLimitPerSchedulingMode.get(schedulingMode);
+    User user = getUser(userName);
+    float weight = (user == null) ? 1.0f : user.getWeight();
+    Resource userSpecificUserLimit =
+        Resources.multiplyAndNormalizeDown(resourceCalculator,
+            userLimitResource, weight, lQueue.getMinimumAllocation());
+
     if (LOG.isDebugEnabled()) {
-      LOG.debug("userLimit is fetched. userLimit = "
-          + userLimitPerSchedulingMode.get(schedulingMode) + ", schedulingMode="
-          + schedulingMode + ", partition=" + nodePartition);
+      LOG.debug("userLimit is fetched. userLimit=" + userLimitResource
+          + ", userSpecificUserLimit=" + userSpecificUserLimit
+          + ", schedulingMode=" + schedulingMode
+          + ", partition=" + nodePartition);
     }
 
-    return userLimitPerSchedulingMode.get(schedulingMode);
+    return userSpecificUserLimit;
   }
 
   /*
@@ -656,16 +703,19 @@ public class UsersManager implements AbstractUsersManager {
         queueCapacity, required);
 
     /*
-     * We want to base the userLimit calculation on max(queueCapacity,
-     * usedResources+required). However, we want usedResources to be based on
-     * the combined ratios of all the users in the queue so we use consumedRatio
-     * to calculate such. The calculation is dependent on how the
-     * resourceCalculator calculates the ratio between two Resources. DRF
-     * Example: If usedResources is greater than queueCapacity and users have
-     * the following [mem,cpu] usages: User1: [10%,20%] - Dominant resource is
-     * 20% User2: [30%,10%] - Dominant resource is 30% Then total consumedRatio
-     * is then 20+30=50%. Yes, this value can be larger than 100% but for the
-     * purposes of making sure all users are getting their fair share, it works.
+     * We want to base the userLimit calculation on
+     * max(queueCapacity, usedResources+required). However, we want
+     * usedResources to be based on the combined ratios of all the users in the
+     * queue so we use consumedRatio to calculate such.
+     * The calculation is dependent on how the resourceCalculator calculates the
+     * ratio between two Resources. DRF Example: If usedResources is greater
+     * than queueCapacity and users have the following [mem,cpu] usages:
+     *
+     * User1: [10%,20%] - Dominant resource is 20%
+     * User2: [30%,10%] - Dominant resource is 30%
+     * Then total consumedRatio is then 20+30=50%. Yes, this value can be
+     * larger than 100% but for the purposes of making sure all users are
+     * getting their fair share, it works.
      */
     Resource consumed = Resources.multiplyAndNormalizeUp(resourceCalculator,
         partitionResource, getUsageRatio(nodePartition),
@@ -680,23 +730,23 @@ public class UsersManager implements AbstractUsersManager {
      * capacity * user-limit-factor. Also, the queue's configured capacity
      * should be higher than queue-hard-limit * ulMin
      */
-    int usersCount = getNumActiveUsers();
+    float usersSummedByWeight = activeUsersTimesWeights;
     Resource resourceUsed = totalResUsageForActiveUsers.getUsed(nodePartition);
 
     // For non-activeUser calculation, consider all users count.
     if (!activeUser) {
       resourceUsed = currentCapacity;
-      usersCount = users.size();
+      usersSummedByWeight = allUsersTimesWeights;
     }
 
     /*
-     * User limit resource is determined by: max{currentCapacity / #activeUsers,
+     * User limit resource is determined by: max(currentCapacity / #activeUsers,
      * currentCapacity * user-limit-percentage%)
      */
     Resource userLimitResource = Resources.max(resourceCalculator,
         partitionResource,
         Resources.divideAndCeil(resourceCalculator, resourceUsed,
-            usersCount),
+            usersSummedByWeight),
         Resources.divideAndCeil(resourceCalculator,
             Resources.multiplyAndRoundDown(currentCapacity, getUserLimit()),
             100));
@@ -727,18 +777,26 @@ public class UsersManager implements AbstractUsersManager {
             lQueue.getMinimumAllocation());
 
     if (LOG.isDebugEnabled()) {
-      LOG.debug("User limit computation for " + userName + " in queue "
-          + lQueue.getQueueName() + " userLimitPercent=" + lQueue.getUserLimit()
-          + " userLimitFactor=" + lQueue.getUserLimitFactor() + " required: "
-          + required + " consumed: " + consumed + " user-limit-resource: "
-          + userLimitResource + " queueCapacity: " + queueCapacity
-          + " qconsumed: " + lQueue.getQueueResourceUsage().getUsed()
-          + " currentCapacity: " + currentCapacity + " activeUsers: "
-          + usersCount + " clusterCapacity: " + clusterResource
-          + " resourceByLabel: " + partitionResource + " usageratio: "
-          + getUsageRatio(nodePartition) + " Partition: " + nodePartition);
-    }
-    getUser(userName).setUserResourceLimit(userLimitResource);
+      LOG.debug("User limit computation for " + userName
+          + ",  in queue: " + lQueue.getQueueName()
+          + ",  userLimitPercent=" + lQueue.getUserLimit()
+          + ",  userLimitFactor=" + lQueue.getUserLimitFactor()
+          + ",  required=" + required
+          + ",  consumed=" + consumed
+          + ",  user-limit-resource=" + userLimitResource
+          + ",  queueCapacity=" + queueCapacity
+          + ",  qconsumed=" + lQueue.getQueueResourceUsage().getUsed()
+          + ",  currentCapacity=" + currentCapacity
+          + ",  activeUsers=" + usersSummedByWeight
+          + ",  clusterCapacity=" + clusterResource
+          + ",  resourceByLabel=" + partitionResource
+          + ",  usageratio=" + getUsageRatio(nodePartition)
+          + ",  Partition=" + nodePartition
+          + ",  resourceUsed=" + resourceUsed
+          + ",  maxUserLimit=" + maxUserLimit
+          + ",  userWeight=" + getUser(userName).getWeight()
+      );
+    }
     return userLimitResource;
   }
 
@@ -838,6 +896,32 @@ public class UsersManager implements AbstractUsersManager {
     return activeUsers.get();
   }
 
+  float sumActiveUsersTimesWeights() {
+    float count = 0.0f;
+    try {
+      this.readLock.lock();
+      for (String u : activeUsersSet) {
+        count += getUser(u).getWeight();
+      }
+      return count;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
+  float sumAllUsersTimesWeights() {
+    float count = 0.0f;
+    try {
+      this.readLock.lock();
+      for (String u : users.keySet()) {
+        count += getUser(u).getWeight();
+      }
+      return count;
+    } finally {
+      this.readLock.unlock();
+    }
+  }
+
   private void updateActiveUsersResourceUsage(String userName) {
     try {
       this.writeLock.lock();
@@ -850,6 +934,7 @@ public class UsersManager implements AbstractUsersManager {
       if (nonActiveUsersSet.contains(userName)) {
         nonActiveUsersSet.remove(userName);
         activeUsersSet.add(userName);
+        activeUsersTimesWeights = sumActiveUsersTimesWeights();
 
         // Update total resource usage of active and non-active after user
         // is moved from non-active to active.
@@ -890,6 +975,7 @@ public class UsersManager implements AbstractUsersManager {
       if (activeUsersSet.contains(userName)) {
         activeUsersSet.remove(userName);
         nonActiveUsersSet.add(userName);
+        activeUsersTimesWeights = sumActiveUsersTimesWeights();
 
         // Update total resource usage of active and non-active after user is
         // moved from active to non-active.
@@ -990,4 +1076,18 @@ public class UsersManager implements AbstractUsersManager {
               + totalResUsageForNonActiveUsers.getAllUsed());
     }
   }
+
+  public void updateUserWeights() {
+    try {
+      this.writeLock.lock();
+      for (Map.Entry<String, User> ue : users.entrySet()) {
+        ue.getValue().setWeight(getUserWeightFromQueue(ue.getKey()));
+      }
+      activeUsersTimesWeights = sumActiveUsersTimesWeights();
+      allUsersTimesWeights = sumAllUsersTimesWeights();
+      userLimitNeedsRecompute();
+    } finally {
+      this.writeLock.unlock();
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 331585e..ad4c8ce 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -881,8 +881,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
         .append(queue.getAMResourceLimitPerPartition(appAMNodePartitionName));
     diagnosticMessage.append("; ");
     diagnosticMessage.append("User AM Resource Limit of the queue = ");
-    diagnosticMessage.append(
-        queue.getUserAMResourceLimitPerPartition(appAMNodePartitionName));
+    diagnosticMessage.append(queue.getUserAMResourceLimitPerPartition(
+        appAMNodePartitionName, getUser()));
     diagnosticMessage.append("; ");
     diagnosticMessage.append("Queue AM Resource Usage = ");
     diagnosticMessage.append(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
index b972428..292c5f3 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/CapacitySchedulerPage.java
@@ -68,6 +68,7 @@ class CapacitySchedulerPage extends RmView {
       "left:0%;background:none;border:1px dashed #BFBFBF";
   static final String Q_OVER = "background:#FFA333";
   static final String Q_UNDER = "background:#5BD75B";
+  static final String ACTIVE_USER = "background:#FFFF00"; // Yellow highlight
 
   @RequestScoped
   static class CSQInfo {
@@ -209,6 +210,7 @@ class CapacitySchedulerPage extends RmView {
           html.table("#userinfo").thead().$class("ui-widget-header").tr().th()
               .$class("ui-state-default")._("User Name")._().th()
               .$class("ui-state-default")._("Max Resource")._().th()
+              .$class("ui-state-default")._("Weight")._().th()
               .$class("ui-state-default")._("Used Resource")._().th()
               .$class("ui-state-default")._("Max AM Resource")._().th()
               .$class("ui-state-default")._("Used AM Resource")._().th()
@@ -229,8 +231,11 @@ class CapacitySchedulerPage extends RmView {
         ResourceInfo amUsed = (resourceUsages.getAmUsed() == null)
             ? new ResourceInfo(Resources.none())
             : resourceUsages.getAmUsed();
-        tbody.tr().td(userInfo.getUsername())
+        String highlightIfAsking =
+            userInfo.getIsActive() ? ACTIVE_USER : null;
+        tbody.tr().$style(highlightIfAsking).td(userInfo.getUsername())
             .td(userInfo.getUserResourceLimit().toString())
+            .td(String.valueOf(userInfo.getUserWeight()))
             .td(resourcesUsed.toString())
             .td(resourceUsages.getAMLimit().toString())
             .td(amUsed.toString())
@@ -399,6 +404,8 @@ class CapacitySchedulerPage extends RmView {
               _("Used (over capacity)")._().
             span().$class("qlegend ui-corner-all ui-state-default").
               _("Max Capacity")._().
+            span().$class("qlegend ui-corner-all").$style(ACTIVE_USER).
+            _("Users Requesting Resources")._().
           _();
 
         float used = 0;

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index a9ed5a9..4417132 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -955,7 +955,130 @@ public class TestLeafQueue {
     // app_0 doesn't have outstanding resources, there's only one active user.
     assertEquals("There should only be 1 active user!", 
         1, a.getAbstractUsersManager().getNumActiveUsers());
+  }
+
+  @Test
+  public void testUserSpecificUserLimits() throws Exception {
+    // Mock the queue
+    LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
+    // Set minimum-user-limit-percent for queue "a" in the configs.
+    csConf.setUserLimit(a.getQueuePath(), 50);
+    // Set weight for "user_0" to be 1.5 for the a queue in the configs.
+    csConf.setFloat("yarn.scheduler.capacity." + a.getQueuePath()
+        + ".user-settings.user_0." + CapacitySchedulerConfiguration.USER_WEIGHT,
+        1.5f);
+
+    when(csContext.getClusterResource())
+        .thenReturn(Resources.createResource(16 * GB, 32));
+    // Verify that configs were updated and parsed correctly.
+    Assert.assertNull(a.getUserWeights().get("user_0"));
+    a.reinitialize(a, csContext.getClusterResource());
+    assertEquals(1.5, a.getUserWeights().get("user_0").floatValue(), 0.0);
+
+    // set maxCapacity
+    a.setMaxCapacity(1.0f);
+
+    // Set minimum user-limit-percent
+    a.setUserLimit(50);
+    a.setUserLimitFactor(2);
+
+    // Users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+
+    // Set user_0's weight to 1.5 in the a queue's object.
+    a.getUsersManager().getUserAndAddIfAbsent(user_0).setWeight(1.5f);
+
+    // Submit applications
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, a,
+            a.getAbstractUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_1, a,
+            a.getAbstractUsersManager(), spyRMContext);
+    a.submitApplicationAttempt(app_1, user_1); // different user
+
+    // Setup some nodes
+    String host_0 = "127.0.0.1";
+    FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
+    String host_1 = "127.0.0.2";
+    FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
+
+    final int numNodes = 2;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (8*GB), numNodes * 16);
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+
+    // Setup resource-requests
+    // app_0 asks for 3 3-GB containers
+    Priority priority = TestUtils.createMockPriority(1);
+    app_0.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 3, true,
+                priority, recordFactory)));
+
+    // app_1 asks for 2 1-GB containers
+    app_1.updateResourceRequests(Collections.singletonList(
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
+            priority, recordFactory)));
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0, app_1.getApplicationAttemptId(),
+        app_1);
+    Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node_0.getNodeID(),
+        node_0, node_1.getNodeID(), node_1);
+
+    /**
+     * Start testing...
+     */
+
+    // There're two active users
+    assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
+
+    // 1 container to user_0. Since queue starts out empty, user limit would
+    // normally be calculated to be the minumum container size (1024GB).
+    // However, in this case, user_0 has a weight of 1.5, so the UL is 2048GB
+    // because 1024 * 1.5 rounded up to container size is 2048GB.
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(4*GB, a.getUsedResources().getMemorySize());
+    assertEquals(4*GB, app_0.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+
+    // At this point the queue-wide user limit is 3072GB, but since user_0 has a
+    // weight of 1.5, its user limit is 5120GB. So, even though user_0 already
+    // has 4096GB, it is under its user limit, so it gets another container.
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_0,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(8*GB, a.getUsedResources().getMemorySize());
+    assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+
+    // Queue-wide user limit at this point is 4069GB and user_0's user limit is
+    // 6144GB. user_0 has 8192GB.
+    // Now that user_0 is above its user limit, the next container should go to user_1
+    applyCSAssignment(clusterResource,
+        a.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
+    assertEquals(9*GB, a.getUsedResources().getMemorySize());
+    assertEquals(8*GB, app_0.getCurrentConsumption().getMemorySize());
+    assertEquals(1*GB, app_1.getCurrentConsumption().getMemorySize());
+
+    assertEquals(4*GB,
+        app_0.getTotalPendingRequestsPerPartition().get("").getMemorySize());
 
+    assertEquals(1*GB,
+        app_1.getTotalPendingRequestsPerPartition().get("").getMemorySize());
   }
 
   @SuppressWarnings({ "unchecked", "rawtypes" })

http://git-wip-us.apache.org/repos/asf/hadoop/blob/ca13b224/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
index 737bdc2..f1d4535 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md
@@ -124,6 +124,7 @@ Configuration
 | `yarn.scheduler.capacity.<queue-path>.user-limit-factor` | The multiple of the queue capacity which can be configured to allow a single user to acquire more resources. By default this is set to 1 which ensures that a single user can never take more than the queue's configured capacity irrespective of how idle the cluster is. Value is specified as a float. |
 | `yarn.scheduler.capacity.<queue-path>.maximum-allocation-mb` | The per queue maximum limit of memory to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-mb`. This value must be smaller than or equal to the cluster maximum. |
 | `yarn.scheduler.capacity.<queue-path>.maximum-allocation-vcores` | The per queue maximum limit of virtual cores to allocate to each container request at the Resource Manager. This setting overrides the cluster configuration `yarn.scheduler.maximum-allocation-vcores`. This value must be smaller than or equal to the cluster maximum. |
+| `yarn.scheduler.capacity.<queue-path>.user-settings.<user-name>.weight` | This floating point value is used when calculating the user limit resource values for users in a queue. This value will weight each user more or less than the other users in the queue. For example, if user A should receive 50% more resources in a queue than users B and C, this property will be set to 1.5 for user A.  Users B and C will default to 1.0. |
 
   * Running and Pending Application Limits
   


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org