You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/09/11 17:00:58 UTC

[hadoop] branch branch-3.2 updated: YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).

This is an automated email from the ASF dual-hosted git repository.

epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git


The following commit(s) were added to refs/heads/branch-3.2 by this push:
     new 5b14af6  YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).
5b14af6 is described below

commit 5b14af6d09007be8781397d41deb39a073bfb559
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Fri Sep 11 13:29:26 2020 +0000

    YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).
    
    (cherry picked from commit 9afec2ed1721467aef7f2cd025d713273b12a6ca)
---
 .../scheduler/capacity/CapacityScheduler.java      |   3 +-
 .../scheduler/capacity/LeafQueue.java              |  67 ++-
 .../scheduler/capacity/UsersManager.java           |  35 +-
 .../capacity/TestCapacitySchedulerPerf.java        | 245 +++++++---
 .../scheduler/capacity/TestLeafQueue.java          | 535 +++++++++++++++++++++
 5 files changed, 808 insertions(+), 77 deletions(-)

diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d3b69e8..01ac809 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -226,7 +226,8 @@ public class CapacityScheduler extends
   private boolean usePortForNodeName;
 
   private boolean scheduleAsynchronously;
-  private List<AsyncScheduleThread> asyncSchedulerThreads;
+  @VisibleForTesting
+  protected List<AsyncScheduleThread> asyncSchedulerThreads;
   private ResourceCommitterService resourceCommitterService;
   private RMNodeLabelsManager labelManager;
   private AppPriorityACLsManager appPriorityACLManager;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3bedd0d..ef4a31e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.util.*;
 import java.util.Map.Entry;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 import org.apache.commons.lang3.StringUtils;
 import org.apache.commons.lang3.time.DateUtils;
@@ -121,6 +122,16 @@ public class LeafQueue extends AbstractCSQueue {
 
   private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
 
+  // Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
+  // Not thread safe: only the last level is a ConcurrentMap
+  @VisibleForTesting
+  Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
+      userLimitsCache = new HashMap<>();
+
+  // Not thread safe
+  @VisibleForTesting
+  long currentUserLimitCacheVersion = 0;
+
   // record all ignore partition exclusivityRMContainer, this will be used to do
   // preemption, key is the partition of the RMContainer allocated on
   private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@@ -1041,6 +1052,47 @@ public class LeafQueue extends AbstractCSQueue {
     return null;
   }
 
+  private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
+      String partition,
+      SchedulingMode schedulingMode) {
+    synchronized (userLimitsCache) {
+      long latestVersion = usersManager.getLatestVersionOfUsersState();
+
+      if (latestVersion != this.currentUserLimitCacheVersion) {
+        // User limits cache needs invalidating
+        this.currentUserLimitCacheVersion = latestVersion;
+        userLimitsCache.clear();
+
+        Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
+            uLCByPartition = new HashMap<>();
+        userLimitsCache.put(partition, uLCByPartition);
+
+        ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
+            new ConcurrentHashMap<>();
+        uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
+
+        return uLCBySchedulingMode;
+      }
+
+      // User limits cache does not need invalidating
+      Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
+          uLCByPartition = userLimitsCache.get(partition);
+      if (uLCByPartition == null) {
+        uLCByPartition = new HashMap<>();
+        userLimitsCache.put(partition, uLCByPartition);
+      }
+
+      ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
+          uLCByPartition.get(schedulingMode);
+      if (uLCBySchedulingMode == null) {
+        uLCBySchedulingMode = new ConcurrentHashMap<>();
+        uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
+      }
+
+      return uLCBySchedulingMode;
+    }
+  }
+
   @Override
   public CSAssignment assignContainers(Resource clusterResource,
       CandidateNodeSet<FiCaSchedulerNode> candidates,
@@ -1088,7 +1140,8 @@ public class LeafQueue extends AbstractCSQueue {
       return CSAssignment.NULL_ASSIGNMENT;
     }
 
-    Map<String, CachedUserLimit> userLimits = new HashMap<>();
+    ConcurrentMap<String, CachedUserLimit> userLimits =
+        this.getUserLimitCache(candidates.getPartition(), schedulingMode);
     boolean needAssignToQueueCheck = true;
     IteratorSelector sel = new IteratorSelector();
     sel.setPartition(candidates.getPartition());
@@ -1132,7 +1185,13 @@ public class LeafQueue extends AbstractCSQueue {
           cachedUserLimit);
       if (cul == null) {
         cul = new CachedUserLimit(userLimit);
-        userLimits.put(application.getUser(), cul);
+        CachedUserLimit retVal =
+            userLimits.putIfAbsent(application.getUser(), cul);
+        if (retVal != null) {
+          // another thread updated the user limit cache before us
+          cul = retVal;
+          userLimit = cul.userLimit;
+        }
       }
       // Check user limit
       boolean userAssignable = true;
@@ -2204,8 +2263,8 @@ public class LeafQueue extends AbstractCSQueue {
 
   static class CachedUserLimit {
     final Resource userLimit;
-    boolean canAssign = true;
-    Resource reservation = Resources.none();
+    volatile boolean canAssign = true;
+    volatile Resource reservation = Resources.none();
 
     CachedUserLimit(Resource userLimit) {
       this.userLimit = userLimit;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 960067e..3320ecb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
 import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -72,7 +71,7 @@ public class UsersManager implements AbstractUsersManager {
 
   // To detect whether there is a change in user count for every user-limit
   // calculation.
-  private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
+  private long latestVersionOfUsersState = 0;
   private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
       new HashMap<String, Map<SchedulingMode, Long>>();
   private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
@@ -91,8 +90,12 @@ public class UsersManager implements AbstractUsersManager {
       new HashMap<String, Set<ApplicationId>>();
 
   // Pre-computed list of user-limits.
-  Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
-  Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
+  @VisibleForTesting
+  Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit =
+      new HashMap<>();
+  @VisibleForTesting
+  Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit =
+      new HashMap<>();
 
   private float activeUsersTimesWeights = 0.0f;
   private float allUsersTimesWeights = 0.0f;
@@ -361,9 +364,9 @@ public class UsersManager implements AbstractUsersManager {
     try {
       writeLock.lock();
 
-      long value = latestVersionOfUsersState.incrementAndGet();
+      long value = ++latestVersionOfUsersState;
       if (value < 0) {
-        latestVersionOfUsersState.set(0);
+        latestVersionOfUsersState = 0;
       }
     } finally {
       writeLock.unlock();
@@ -586,6 +589,15 @@ public class UsersManager implements AbstractUsersManager {
     return userSpecificUserLimit;
   }
 
+  protected long getLatestVersionOfUsersState() {
+    readLock.lock();
+    try {
+      return latestVersionOfUsersState;
+    } finally {
+      readLock.unlock();
+    }
+  }
+
   /*
    * Recompute user-limit under following conditions: 1. cached user-limit does
    * not exist in local map. 2. Total User count doesn't match with local cached
@@ -593,8 +605,13 @@ public class UsersManager implements AbstractUsersManager {
    */
   private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
       String nodePartition, boolean isActive) {
-    return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
-        isActive) != latestVersionOfUsersState.get());
+    readLock.lock();
+    try {
+      return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
+          isActive) != latestVersionOfUsersState);
+    } finally {
+      readLock.unlock();
+    }
   }
 
   /*
@@ -615,7 +632,7 @@ public class UsersManager implements AbstractUsersManager {
         localVersionOfUsersState.put(nodePartition, localVersion);
       }
 
-      localVersion.put(schedulingMode, latestVersionOfUsersState.get());
+      localVersion.put(schedulingMode, latestVersionOfUsersState);
     } finally {
       writeLock.unlock();
     }
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
index b2e71cf..58086f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
@@ -41,11 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
 import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
 import org.apache.hadoop.yarn.server.utils.BuilderUtils;
 import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
 import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -57,6 +59,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
 
 import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
 import static org.junit.Assert.assertEquals;
@@ -72,6 +75,29 @@ public class TestCapacitySchedulerPerf {
     return "resource-" + idx;
   }
 
+  public static class CapacitySchedulerPerf extends CapacityScheduler {
+    volatile boolean enable = false;
+    AtomicLong count = new AtomicLong(0);
+
+    public CapacitySchedulerPerf() {
+      super();
+    }
+
+    @Override
+    CSAssignment allocateContainersToNode(
+        CandidateNodeSet<FiCaSchedulerNode> candidates,
+        boolean withNodeHeartbeat) {
+      CSAssignment retVal = super.allocateContainersToNode(candidates,
+          withNodeHeartbeat);
+
+      if (enable) {
+        count.incrementAndGet();
+      }
+
+      return retVal;
+    }
+  }
+
   // This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
   // on the command line. In addition, this test has tunables for the following:
   //   Number of queues: -DNumberOfQueues (default=100)
@@ -88,6 +114,9 @@ public class TestCapacitySchedulerPerf {
       throws Exception {
     Assume.assumeTrue(Boolean.valueOf(
         System.getProperty("RunCapacitySchedulerPerfTests")));
+    int numThreads = Integer.valueOf(System.getProperty(
+        "CapacitySchedulerPerfTestsNumThreads", "0"));
+
     if (numOfResourceTypes > 2) {
       // Initialize resource map
       Map<String, ResourceInformation> riMap = new HashMap<>();
@@ -112,13 +141,30 @@ public class TestCapacitySchedulerPerf {
 
     CapacitySchedulerConfiguration csconf =
         createCSConfWithManyQueues(numQueues);
+    if (numThreads > 0) {
+      csconf.setScheduleAynschronously(true);
+      csconf.setInt(
+          CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+          numThreads);
+      csconf.setLong(
+          CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+              + ".scheduling-interval-ms", 0);
+    }
 
     YarnConfiguration conf = new YarnConfiguration(csconf);
     // Don't reset resource types since we have already configured resource
     // types
     conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
-    conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
-        ResourceScheduler.class);
+
+    if (numThreads > 0) {
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacitySchedulerPerf.class,
+          ResourceScheduler.class);
+      // avoid getting skipped (see CapacityScheduler.shouldSkipNodeSchedule)
+      conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 600000);
+    } else {
+      conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+              ResourceScheduler.class);
+    }
 
     MockRM rm = new MockRM(conf);
     rm.start();
@@ -189,6 +235,13 @@ public class TestCapacitySchedulerPerf {
     RecordFactory recordFactory =
         RecordFactoryProvider.getRecordFactory(null);
 
+    if (numThreads > 0) {
+      // disable async scheduling threads
+      for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
+        t.suspendSchedule();
+      }
+    }
+
     FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
     for (int i=0;i<totalApps;i++) {
       fiCaApps[i] =
@@ -213,79 +266,145 @@ public class TestCapacitySchedulerPerf {
       lqs[i].setUserLimitFactor((float)0.0);
     }
 
-    // allocate one container for each extra apps since
-    //  LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
-    cs.handle(new NodeUpdateSchedulerEvent(node));
-    cs.handle(new NodeUpdateSchedulerEvent(node2));
+    if (numThreads > 0) {
+      // enable async scheduling threads
+      for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
+        t.beginSchedule();
+      }
 
-    // make sure only the extra apps have allocated containers
-    for (int i=0;i<totalApps;i++) {
-      boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
-      if (i < activeQueues) {
-        assertFalse(pending);
-        assertEquals(0,
-            fiCaApps[i].getTotalPendingRequestsPerPartition().size());
-      } else {
-        assertTrue(pending);
-        assertEquals(1*GB,
-            fiCaApps[i].getTotalPendingRequestsPerPartition()
-                .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+      // let the threads allocate resources for extra apps
+      while (CapacitySchedulerMetrics.getMetrics().commitSuccess.lastStat()
+          .numSamples() < activeQueues) {
+        Thread.sleep(1000);
+      }
+
+      // count the number of apps with allocated containers
+      int numNotPending = 0;
+      for (int i = 0; i < totalApps; i++) {
+        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+        if (!pending) {
+          numNotPending++;
+          assertEquals(0,
+              fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+        } else {
+          assertEquals(1*GB,
+              fiCaApps[i].getTotalPendingRequestsPerPartition()
+                  .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+        }
+      }
+
+      // make sure only extra apps have allocated containers
+      assertEquals(activeQueues, numNotPending);
+    } else {
+      // allocate one container for each extra apps since
+      //  LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
+      cs.handle(new NodeUpdateSchedulerEvent(node));
+      cs.handle(new NodeUpdateSchedulerEvent(node2));
+
+      // make sure only the extra apps have allocated containers
+      for (int i=0;i<totalApps;i++) {
+        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+        if (i < activeQueues) {
+          assertFalse(pending);
+          assertEquals(0,
+              fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+        } else {
+          assertTrue(pending);
+          assertEquals(1*GB,
+              fiCaApps[i].getTotalPendingRequestsPerPartition()
+                  .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+        }
       }
     }
 
     // Quiet the loggers while measuring throughput
     GenericTestUtils.setRootLogLevel(Level.WARN);
-    final int topn = 20;
-    final int iterations = 2000000;
-    final int printInterval = 20000;
-    final float numerator = 1000.0f * printInterval;
-    PriorityQueue<Long> queue = new PriorityQueue<>(topn,
-        Collections.reverseOrder());
-
-    long n = Time.monotonicNow();
-    long timespent = 0;
-    for (int i = 0; i < iterations; i+=2) {
-      if (i > 0  && i % printInterval == 0){
-        long ts = (Time.monotonicNow() - n);
-        if (queue.size() < topn) {
-          queue.offer(ts);
-        } else {
-          Long last = queue.peek();
-          if (last > ts) {
-            queue.poll();
+
+    if (numThreads > 0) {
+      System.out.println("Starting now");
+      ((CapacitySchedulerPerf) cs).enable = true;
+      long start = Time.monotonicNow();
+      Thread.sleep(60000);
+      long end = Time.monotonicNow();
+      ((CapacitySchedulerPerf) cs).enable = false;
+      long numOps = ((CapacitySchedulerPerf) cs).count.get();
+      System.out.println("Number of operations: " + numOps);
+      System.out.println("Time taken: " + (end - start) + " ms");
+      System.out.println("" + (numOps * 1000 / (end - start))
+          + " ops / second");
+    } else {
+      final int topn = 20;
+      final int iterations = 2000000;
+      final int printInterval = 20000;
+      final float numerator = 1000.0f * printInterval;
+      PriorityQueue<Long> queue = new PriorityQueue<>(topn,
+          Collections.reverseOrder());
+
+      long n = Time.monotonicNow();
+      long timespent = 0;
+      for (int i = 0; i < iterations; i+=2) {
+        if (i > 0  && i % printInterval == 0){
+          long ts = (Time.monotonicNow() - n);
+          if (queue.size() < topn) {
             queue.offer(ts);
+          } else {
+            Long last = queue.peek();
+            if (last > ts) {
+              queue.poll();
+              queue.offer(ts);
+            }
           }
+          System.out.println(i + " " + (numerator / ts));
+          n = Time.monotonicNow();
         }
-        System.out.println(i + " " + (numerator / ts));
-        n= Time.monotonicNow();
+        cs.handle(new NodeUpdateSchedulerEvent(node));
+        cs.handle(new NodeUpdateSchedulerEvent(node2));
       }
-      cs.handle(new NodeUpdateSchedulerEvent(node));
-      cs.handle(new NodeUpdateSchedulerEvent(node2));
-    }
-    timespent=0;
-    int entries = queue.size();
-    while(queue.size() > 0){
-      long l = queue.poll();
-      timespent += l;
+      timespent = 0;
+      int entries = queue.size();
+      while (queue.size() > 0) {
+        long l = queue.poll();
+        timespent += l;
+      }
+      System.out.println("#ResourceTypes = " + numOfResourceTypes
+          + ". Avg of fastest " + entries
+          + ": " + numerator / (timespent / entries) + " ops/sec of "
+          + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+          + " queues.");
     }
-    System.out.println(
-        "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
-            + ": " + numerator / (timespent / entries) + " ops/sec of "
-            + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
-            + " queues.");
 
-    // make sure only the extra apps have allocated containers
-    for (int i=0;i<totalApps;i++) {
-      boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
-      if (i < activeQueues) {
-        assertFalse(pending);
-        assertEquals(0,
-            fiCaApps[i].getTotalPendingRequestsPerPartition().size());
-      } else {
-        assertTrue(pending);
-        assertEquals(1*GB,
-            fiCaApps[i].getTotalPendingRequestsPerPartition()
-                .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+    if (numThreads > 0) {
+      // count the number of apps with allocated containers
+      int numNotPending = 0;
+      for (int i = 0; i < totalApps; i++) {
+        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+        if (!pending) {
+          numNotPending++;
+          assertEquals(0,
+                  fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+        } else {
+          assertEquals(1*GB,
+                  fiCaApps[i].getTotalPendingRequestsPerPartition()
+                          .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+        }
+      }
+
+      // make sure only extra apps have allocated containers
+      assertEquals(activeQueues, numNotPending);
+    } else {
+      // make sure only the extra apps have allocated containers
+      for (int i = 0; i < totalApps; i++) {
+        boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+        if (i < activeQueues) {
+          assertFalse(pending);
+          assertEquals(0,
+                  fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+        } else {
+          assertTrue(pending);
+          assertEquals(1 * GB,
+                  fiCaApps[i].getTotalPendingRequestsPerPartition()
+                          .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+        }
       }
     }
 
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 5bf8b05..ba682f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
     .capacity.CapacitySchedulerConfiguration.ROOT;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Matchers.any;
 import static org.mockito.Matchers.anyBoolean;
@@ -45,15 +46,18 @@ import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Random;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import com.google.common.collect.ImmutableMap;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -897,6 +901,537 @@ public class TestLeafQueue {
   }
 
   @Test
+  public void testUserLimitCache() throws Exception {
+    // Parameters
+    final int numNodes = 4;
+    final int nodeSize = 100;
+    final int numAllocationThreads = 2;
+    final int numUsers = 40;
+    final int containerSize = 1 * GB;
+    final int numContainersPerApp = 10;
+    final int runTime = 5000; // in ms
+
+    Random random = new Random();
+
+    // Setup nodes
+    FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes];
+    Map<NodeId, FiCaSchedulerNode> nodesMap = new HashMap<>(nodes.length);
+    for (int i = 0; i < numNodes; i++) {
+      String host = "127.0.0." + i;
+      FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0,
+          nodeSize * GB, nodeSize);
+      nodes[i] = node;
+      nodesMap.put(node.getNodeID(), node);
+    }
+
+    Resource clusterResource =
+        Resources.createResource(numNodes * (nodeSize * GB),
+            numNodes * nodeSize);
+
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+    // working with just one queue
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A});
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
+    csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
+        100);
+
+    // reinitialize queues
+    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+    CSQueue newRoot =
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT,
+            newQueues, queues,
+            TestUtils.spyHook);
+    queues = newQueues;
+    root.reinitialize(newRoot, csContext.getClusterResource());
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // Mock the queue
+    LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A));
+
+    // Set user limit factor so some users are at their limit and the
+    // user limit cache has more than just a few entries
+    leafQueue.setUserLimitFactor(10 / nodeSize);
+
+    // Flag to let allocation threads know to stop
+    AtomicBoolean stopThreads = new AtomicBoolean(false);
+    AtomicBoolean errorInThreads = new AtomicBoolean(false);
+
+    // Set up allocation threads
+    Thread[] threads = new Thread[numAllocationThreads];
+    for (int i = 0; i < numAllocationThreads; i++) {
+      threads[i] = new Thread(new Runnable() {
+        @Override
+        public void run() {
+          try {
+            boolean alwaysNull = true;
+            while (!stopThreads.get()) {
+              CSAssignment assignment = leafQueue.assignContainers(
+                  clusterResource,
+                  nodes[random.nextInt(numNodes)],
+                  new ResourceLimits(clusterResource),
+                  SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+              applyCSAssignment(clusterResource, assignment, leafQueue,
+                  nodesMap, leafQueue.applicationAttemptMap);
+
+              if (assignment != CSAssignment.NULL_ASSIGNMENT) {
+                alwaysNull = false;
+              }
+              Thread.sleep(500);
+            }
+
+            // One more assignment but not committing so that the
+            // user limits cache is updated to the latest version
+            CSAssignment assignment = leafQueue.assignContainers(
+                clusterResource,
+                nodes[random.nextInt(numNodes)],
+                new ResourceLimits(clusterResource),
+                SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+            if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) {
+              LOG.error("Thread only got null assignments");
+              errorInThreads.set(true);
+            }
+          } catch (Exception e) {
+            LOG.error("Thread exiting because of exception", e);
+            errorInThreads.set(true);
+          }
+        }
+      }, "Scheduling Thread " + i);
+    }
+
+    // Set up users and some apps
+    final String[] users = new String[numUsers];
+    for (int i = 0; i < users.length; i++) {
+      users[i] = "user_" + i;
+    }
+    List<ApplicationAttemptId> applicationAttemptIds =
+        new ArrayList<>(10);
+    List<FiCaSchedulerApp> apps = new ArrayList<>(10);
+    Priority priority = TestUtils.createMockPriority(1);
+
+    // Start up 10 apps to begin with
+    int appId;
+    for (appId = 0; appId < 10; appId++) {
+      String user = users[random.nextInt(users.length)];
+      ApplicationAttemptId applicationAttemptId =
+          TestUtils.getMockApplicationAttemptId(appId, 0);
+      FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
+          user,
+          leafQueue, leafQueue.getUsersManager(), spyRMContext);
+
+      leafQueue.submitApplicationAttempt(app, user);
+      app.updateResourceRequests(Collections.singletonList(
+          TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
+              numContainersPerApp, true, priority, recordFactory)));
+
+      applicationAttemptIds.add(applicationAttemptId);
+      apps.add(app);
+    }
+
+    // Start threads
+    for (int i = 0; i < numAllocationThreads; i++) {
+      threads[i].start();
+    }
+
+    final long startTime = Time.monotonicNow();
+    while (true) {
+      // Start a new app about half the iterations and stop a random app the
+      // rest of the iterations
+      boolean startOrStopApp = random.nextBoolean();
+      if (startOrStopApp || (apps.size() == 1)) {
+        // start a new app
+        String user = users[random.nextInt(users.length)];
+        ApplicationAttemptId applicationAttemptId =
+            TestUtils.getMockApplicationAttemptId(appId, 0);
+        FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
+            user,
+            leafQueue, leafQueue.getUsersManager(), spyRMContext);
+
+        leafQueue.submitApplicationAttempt(app, user);
+        app.updateResourceRequests(Collections.singletonList(
+            TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
+                numContainersPerApp, true, priority, recordFactory)));
+
+        applicationAttemptIds.add(applicationAttemptId);
+        apps.add(app);
+
+        appId++;
+      } else {
+        // stop a random app
+        int i = random.nextInt(apps.size());
+        FiCaSchedulerApp app = apps.get(i);
+        leafQueue.finishApplication(app.getApplicationId(), app.getUser());
+        leafQueue.releaseResource(clusterResource, app,
+            app.getCurrentConsumption(), "", null);
+        apps.remove(i);
+        applicationAttemptIds.remove(i);
+      }
+
+      if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) {
+        break;
+      }
+    }
+
+    // signal allocation threads to stop
+    stopThreads.set(true);
+
+    // wait for allocation threads to be done
+    for (int i = 0; i < numAllocationThreads; i++) {
+      threads[i].join();
+    }
+
+    // check if there was an error in the allocation threads
+    assertFalse(errorInThreads.get());
+
+    // check there is only one partition in the user limits cache
+    assertEquals( 1, leafQueue.userLimitsCache.size());
+
+    Map<SchedulingMode, ConcurrentMap<String, LeafQueue.CachedUserLimit>>
+        uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition());
+
+    // check there is only one scheduling mode
+    assertEquals(uLCByPartition.size(), 1);
+
+    ConcurrentMap<String, LeafQueue.CachedUserLimit> uLCBySchedulingMode =
+        uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+    // check entries in the user limits cache
+    for (Map.Entry<String, LeafQueue.CachedUserLimit> entry :
+        uLCBySchedulingMode.entrySet()) {
+      String user = entry.getKey();
+      Resource userLimit = entry.getValue().userLimit;
+
+      Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user,
+          clusterResource, nodes[0].getPartition(),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+      assertEquals(expectedUL, userLimit);
+    }
+
+    // check the current version in the user limits cache
+    assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(),
+        leafQueue.currentUserLimitCacheVersion);
+    assertTrue(leafQueue.currentUserLimitCacheVersion > 0);
+  }
+
+  @Test
+  public void testUserLimitCacheActiveUsersChanged() throws Exception {
+    // Setup some nodes
+    String host_0 = "127.0.0.1";
+    FiCaSchedulerNode node_0 =
+        TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB);
+    String host_1 = "127.0.0.2";
+    FiCaSchedulerNode node_1 =
+        TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB);
+    String host_2 = "127.0.0.3";
+    FiCaSchedulerNode node_2 =
+        TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB);
+    String host_3 = "127.0.0.4";
+    FiCaSchedulerNode node_3 =
+        TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB);
+
+    Map<NodeId, FiCaSchedulerNode> nodes =
+        ImmutableMap.of(
+            node_0.getNodeID(), node_0,
+            node_1.getNodeID(), node_1,
+            node_2.getNodeID(), node_2,
+            node_3.getNodeID(), node_3
+        );
+
+    final int numNodes = 4;
+    Resource clusterResource =
+        Resources.createResource(numNodes * (6*GB), numNodes);
+
+    when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+    when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+    // working with just one queue
+    csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A});
+    csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
+    csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
+        100);
+
+    // reinitialize queues
+    Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+    CSQueue newRoot =
+        CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+            CapacitySchedulerConfiguration.ROOT,
+            newQueues, queues,
+            TestUtils.spyHook);
+    queues = newQueues;
+    root.reinitialize(newRoot, csContext.getClusterResource());
+    root.updateClusterResource(clusterResource,
+        new ResourceLimits(clusterResource));
+
+    // Mock the queue
+    LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A));
+
+    // initial check
+    assertEquals(0, leafQueue.userLimitsCache.size());
+    assertEquals(0,
+        leafQueue.getUsersManager().preComputedAllUserLimit.size());
+    assertEquals(0,
+        leafQueue.getUsersManager().preComputedActiveUserLimit.size());
+
+    // 4 users
+    final String user_0 = "user_0";
+    final String user_1 = "user_1";
+    final String user_2 = "user_2";
+    final String user_3 = "user_3";
+
+    // Set user-limit
+    leafQueue.setUserLimit(0);
+    leafQueue.setUserLimitFactor(1.0f);
+
+    Priority priority = TestUtils.createMockPriority(1);
+
+    // Fill queue because user limit is calculated as (used / #active users).
+    final ApplicationAttemptId appAttemptId_9 =
+        TestUtils.getMockApplicationAttemptId(9, 0);
+    FiCaSchedulerApp app_9 =
+        new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue,
+            leafQueue.getUsersManager(), spyRMContext);
+    leafQueue.submitApplicationAttempt(app_9, user_0);
+
+    Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
+        ImmutableMap.of(app_9.getApplicationAttemptId(), app_9);
+
+    app_9.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_0, 1*GB, 5, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
+            priority, recordFactory)));
+    assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers());
+
+    CSAssignment assignment;
+    for (int i = 0; i < 5; i++) {
+      assignment = leafQueue.assignContainers(clusterResource, node_0,
+          new ResourceLimits(clusterResource),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+    }
+    app_9.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_1, 1*GB, 5, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
+            priority, recordFactory)));
+    for (int i = 0; i < 5; i++) {
+      assignment = leafQueue.assignContainers(clusterResource, node_1,
+          new ResourceLimits(clusterResource),
+          SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+      applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+    }
+    // A total of 10GB have been allocated
+    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+    // For one user who should have been cached in the assignContainers call
+    assertEquals(1, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .size());
+    // But the cache is stale because an allocation was made
+    assertNotEquals(leafQueue.currentUserLimitCacheVersion,
+        leafQueue.getUsersManager().getLatestVersionOfUsersState());
+    // Have not made any calls to fill up the all user limit in UsersManager
+    assertEquals(0,
+        leafQueue.getUsersManager().preComputedAllUserLimit.size());
+    // But the user limit cache in leafQueue got filled up using the active
+    // user limit in UsersManager
+    assertEquals(1,
+            leafQueue.getUsersManager().preComputedActiveUserLimit.size());
+
+    // submit 3 applications for now
+    final ApplicationAttemptId appAttemptId_0 =
+        TestUtils.getMockApplicationAttemptId(0, 0);
+    FiCaSchedulerApp app_0 =
+        new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue,
+            leafQueue.getUsersManager(), spyRMContext);
+    leafQueue.submitApplicationAttempt(app_0, user_0);
+
+    final ApplicationAttemptId appAttemptId_1 =
+        TestUtils.getMockApplicationAttemptId(1, 0);
+    FiCaSchedulerApp app_1 =
+        new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue,
+            leafQueue.getUsersManager(), spyRMContext);
+    leafQueue.submitApplicationAttempt(app_1, user_1);
+
+    final ApplicationAttemptId appAttemptId_2 =
+        TestUtils.getMockApplicationAttemptId(2, 0);
+    FiCaSchedulerApp app_2 =
+        new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue,
+            leafQueue.getUsersManager(), spyRMContext);
+    leafQueue.submitApplicationAttempt(app_2, user_2);
+
+    apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0,
+        app_1.getApplicationAttemptId(), app_1,
+        app_2.getApplicationAttemptId(), app_2
+    );
+
+    // requests from first three users (all of which will be locality delayed)
+    app_0.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+            priority, recordFactory)));
+
+    app_1.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+            priority, recordFactory)));
+
+    app_2.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+            priority, recordFactory)));
+
+    // There are 3 active users right now
+    assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers());
+
+    // fill up user limit cache
+    assignment = leafQueue.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+    // A total of 10GB have been allocated
+    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
+    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+    // There are three users who should have been cached
+    assertEquals(3, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .size());
+    // There are three users so each has a limit of 12/3 = 4GB
+    assertEquals(4*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_0).userLimit.getMemorySize());
+    assertEquals(4*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_1).userLimit.getMemorySize());
+    assertEquals(4*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_2).userLimit.getMemorySize());
+    // And the cache is NOT stale because no allocation was made
+    assertEquals(leafQueue.currentUserLimitCacheVersion,
+        leafQueue.getUsersManager().getLatestVersionOfUsersState());
+    // Have not made any calls to fill up the all user limit in UsersManager
+    assertEquals(0,
+        leafQueue.getUsersManager().preComputedAllUserLimit.size());
+    // But the user limit cache in leafQueue got filled up using the active
+    // user limit in UsersManager with 4GB limit (since there are three users
+    // so 12/3 = 4GB each)
+    assertEquals(1, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.size());
+    assertEquals(1, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
+    assertEquals(4*GB, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
+
+    // submit the 4th application
+    final ApplicationAttemptId appAttemptId_3 =
+        TestUtils.getMockApplicationAttemptId(3, 0);
+    FiCaSchedulerApp app_3 =
+        new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue,
+            leafQueue.getUsersManager(), spyRMContext);
+    leafQueue.submitApplicationAttempt(app_3, user_3);
+
+    apps = ImmutableMap.of(
+        app_0.getApplicationAttemptId(), app_0,
+        app_1.getApplicationAttemptId(), app_1,
+        app_2.getApplicationAttemptId(), app_2,
+        app_3.getApplicationAttemptId(), app_3
+    );
+
+    app_3.updateResourceRequests(Arrays.asList(
+        TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+            priority, recordFactory),
+        TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+            priority, recordFactory)));
+
+    // 4 active users now
+    assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers());
+    // Check that the user limits cache has become stale
+    assertNotEquals(leafQueue.currentUserLimitCacheVersion,
+        leafQueue.getUsersManager().getLatestVersionOfUsersState());
+
+    // Even though there are no allocations, user limit cache is repopulated
+    assignment = leafQueue.assignContainers(clusterResource, node_1,
+        new ResourceLimits(clusterResource),
+        SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+    applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+    // A total of 10GB have been allocated
+    assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+    assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
+    assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
+    assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+    // There are four users who should have been cached
+    assertEquals(4, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .size());
+    // There are four users so each has a limit of 12/4 = 3GB
+    assertEquals(3*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_0).userLimit.getMemorySize());
+    assertEquals(3*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_1).userLimit.getMemorySize());
+    assertEquals(3*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_2).userLimit.getMemorySize());
+    assertEquals(3*GB, leafQueue.userLimitsCache
+        .get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+        .get(user_3).userLimit.getMemorySize());
+    // And the cache is NOT stale because no allocation was made
+    assertEquals(leafQueue.currentUserLimitCacheVersion,
+        leafQueue.getUsersManager().getLatestVersionOfUsersState());
+    // Have not made any calls to fill up the all user limit in UsersManager
+    assertEquals(0,
+        leafQueue.getUsersManager().preComputedAllUserLimit.size());
+    // But the user limit cache in leafQueue got filled up using the active
+    // user limit in UsersManager with 3GB limit (since there are four users
+    // so 12/4 = 3GB each)
+    assertEquals(1, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.size());
+    assertEquals(1, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
+    assertEquals(3*GB, leafQueue.getUsersManager()
+        .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
+        .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
+  }
+
+  @Test
   public void testUserLimits() throws Exception {
     // Mock the queue
     LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org