You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ep...@apache.org on 2020/09/11 17:00:58 UTC
[hadoop] branch branch-3.2 updated: YARN-10390: LeafQueue: retain
user limits cache across assignContainers() calls. Contributed by Samir
Khan (samkhan).
This is an automated email from the ASF dual-hosted git repository.
epayne pushed a commit to branch branch-3.2
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/branch-3.2 by this push:
new 5b14af6 YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).
5b14af6 is described below
commit 5b14af6d09007be8781397d41deb39a073bfb559
Author: Eric E Payne <er...@verizonmedia.com>
AuthorDate: Fri Sep 11 13:29:26 2020 +0000
YARN-10390: LeafQueue: retain user limits cache across assignContainers() calls. Contributed by Samir Khan (samkhan).
(cherry picked from commit 9afec2ed1721467aef7f2cd025d713273b12a6ca)
---
.../scheduler/capacity/CapacityScheduler.java | 3 +-
.../scheduler/capacity/LeafQueue.java | 67 ++-
.../scheduler/capacity/UsersManager.java | 35 +-
.../capacity/TestCapacitySchedulerPerf.java | 245 +++++++---
.../scheduler/capacity/TestLeafQueue.java | 535 +++++++++++++++++++++
5 files changed, 808 insertions(+), 77 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index d3b69e8..01ac809 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -226,7 +226,8 @@ public class CapacityScheduler extends
private boolean usePortForNodeName;
private boolean scheduleAsynchronously;
- private List<AsyncScheduleThread> asyncSchedulerThreads;
+ @VisibleForTesting
+ protected List<AsyncScheduleThread> asyncSchedulerThreads;
private ResourceCommitterService resourceCommitterService;
private RMNodeLabelsManager labelManager;
private AppPriorityACLsManager appPriorityACLManager;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 3bedd0d..ef4a31e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -22,6 +22,7 @@ import java.io.IOException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
@@ -121,6 +122,16 @@ public class LeafQueue extends AbstractCSQueue {
private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
+ // Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
+ // Not thread safe: only the last level is a ConcurrentMap
+ @VisibleForTesting
+ Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
+ userLimitsCache = new HashMap<>();
+
+ // Not thread safe
+ @VisibleForTesting
+ long currentUserLimitCacheVersion = 0;
+
// record all ignore partition exclusivityRMContainer, this will be used to do
// preemption, key is the partition of the RMContainer allocated on
private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
@@ -1041,6 +1052,47 @@ public class LeafQueue extends AbstractCSQueue {
return null;
}
+ private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
+ String partition,
+ SchedulingMode schedulingMode) {
+ synchronized (userLimitsCache) {
+ long latestVersion = usersManager.getLatestVersionOfUsersState();
+
+ if (latestVersion != this.currentUserLimitCacheVersion) {
+ // User limits cache needs invalidating
+ this.currentUserLimitCacheVersion = latestVersion;
+ userLimitsCache.clear();
+
+ Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
+ uLCByPartition = new HashMap<>();
+ userLimitsCache.put(partition, uLCByPartition);
+
+ ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
+ new ConcurrentHashMap<>();
+ uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
+
+ return uLCBySchedulingMode;
+ }
+
+ // User limits cache does not need invalidating
+ Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
+ uLCByPartition = userLimitsCache.get(partition);
+ if (uLCByPartition == null) {
+ uLCByPartition = new HashMap<>();
+ userLimitsCache.put(partition, uLCByPartition);
+ }
+
+ ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
+ uLCByPartition.get(schedulingMode);
+ if (uLCBySchedulingMode == null) {
+ uLCBySchedulingMode = new ConcurrentHashMap<>();
+ uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
+ }
+
+ return uLCBySchedulingMode;
+ }
+ }
+
@Override
public CSAssignment assignContainers(Resource clusterResource,
CandidateNodeSet<FiCaSchedulerNode> candidates,
@@ -1088,7 +1140,8 @@ public class LeafQueue extends AbstractCSQueue {
return CSAssignment.NULL_ASSIGNMENT;
}
- Map<String, CachedUserLimit> userLimits = new HashMap<>();
+ ConcurrentMap<String, CachedUserLimit> userLimits =
+ this.getUserLimitCache(candidates.getPartition(), schedulingMode);
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
@@ -1132,7 +1185,13 @@ public class LeafQueue extends AbstractCSQueue {
cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
- userLimits.put(application.getUser(), cul);
+ CachedUserLimit retVal =
+ userLimits.putIfAbsent(application.getUser(), cul);
+ if (retVal != null) {
+ // another thread updated the user limit cache before us
+ cul = retVal;
+ userLimit = cul.userLimit;
+ }
}
// Check user limit
boolean userAssignable = true;
@@ -2204,8 +2263,8 @@ public class LeafQueue extends AbstractCSQueue {
static class CachedUserLimit {
final Resource userLimit;
- boolean canAssign = true;
- Resource reservation = Resources.none();
+ volatile boolean canAssign = true;
+ volatile Resource reservation = Resources.none();
CachedUserLimit(Resource userLimit) {
this.userLimit = userLimit;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 960067e..3320ecb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -24,7 +24,6 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
@@ -72,7 +71,7 @@ public class UsersManager implements AbstractUsersManager {
// To detect whether there is a change in user count for every user-limit
// calculation.
- private AtomicLong latestVersionOfUsersState = new AtomicLong(0);
+ private long latestVersionOfUsersState = 0;
private Map<String, Map<SchedulingMode, Long>> localVersionOfActiveUsersState =
new HashMap<String, Map<SchedulingMode, Long>>();
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
@@ -91,8 +90,12 @@ public class UsersManager implements AbstractUsersManager {
new HashMap<String, Set<ApplicationId>>();
// Pre-computed list of user-limits.
- Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit = new ConcurrentHashMap<>();
- Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit = new ConcurrentHashMap<>();
+ @VisibleForTesting
+ Map<String, Map<SchedulingMode, Resource>> preComputedActiveUserLimit =
+ new HashMap<>();
+ @VisibleForTesting
+ Map<String, Map<SchedulingMode, Resource>> preComputedAllUserLimit =
+ new HashMap<>();
private float activeUsersTimesWeights = 0.0f;
private float allUsersTimesWeights = 0.0f;
@@ -361,9 +364,9 @@ public class UsersManager implements AbstractUsersManager {
try {
writeLock.lock();
- long value = latestVersionOfUsersState.incrementAndGet();
+ long value = ++latestVersionOfUsersState;
if (value < 0) {
- latestVersionOfUsersState.set(0);
+ latestVersionOfUsersState = 0;
}
} finally {
writeLock.unlock();
@@ -586,6 +589,15 @@ public class UsersManager implements AbstractUsersManager {
return userSpecificUserLimit;
}
+ protected long getLatestVersionOfUsersState() {
+ readLock.lock();
+ try {
+ return latestVersionOfUsersState;
+ } finally {
+ readLock.unlock();
+ }
+ }
+
/*
* Recompute user-limit under following conditions: 1. cached user-limit does
* not exist in local map. 2. Total User count doesn't match with local cached
@@ -593,8 +605,13 @@ public class UsersManager implements AbstractUsersManager {
*/
private boolean isRecomputeNeeded(SchedulingMode schedulingMode,
String nodePartition, boolean isActive) {
- return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
- isActive) != latestVersionOfUsersState.get());
+ readLock.lock();
+ try {
+ return (getLocalVersionOfUsersState(nodePartition, schedulingMode,
+ isActive) != latestVersionOfUsersState);
+ } finally {
+ readLock.unlock();
+ }
}
/*
@@ -615,7 +632,7 @@ public class UsersManager implements AbstractUsersManager {
localVersionOfUsersState.put(nodePartition, localVersion);
}
- localVersion.put(schedulingMode, latestVersionOfUsersState.get());
+ localVersion.put(schedulingMode, latestVersionOfUsersState);
} finally {
writeLock.unlock();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
index b2e71cf..58086f0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerPerf.java
@@ -41,11 +41,13 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptM
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -57,6 +59,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.PriorityQueue;
+import java.util.concurrent.atomic.AtomicLong;
import static org.apache.hadoop.yarn.server.resourcemanager.resource.TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES;
import static org.junit.Assert.assertEquals;
@@ -72,6 +75,29 @@ public class TestCapacitySchedulerPerf {
return "resource-" + idx;
}
+ public static class CapacitySchedulerPerf extends CapacityScheduler {
+ volatile boolean enable = false;
+ AtomicLong count = new AtomicLong(0);
+
+ public CapacitySchedulerPerf() {
+ super();
+ }
+
+ @Override
+ CSAssignment allocateContainersToNode(
+ CandidateNodeSet<FiCaSchedulerNode> candidates,
+ boolean withNodeHeartbeat) {
+ CSAssignment retVal = super.allocateContainersToNode(candidates,
+ withNodeHeartbeat);
+
+ if (enable) {
+ count.incrementAndGet();
+ }
+
+ return retVal;
+ }
+ }
+
// This test is run only when when -DRunCapacitySchedulerPerfTests=true is set
// on the command line. In addition, this test has tunables for the following:
// Number of queues: -DNumberOfQueues (default=100)
@@ -88,6 +114,9 @@ public class TestCapacitySchedulerPerf {
throws Exception {
Assume.assumeTrue(Boolean.valueOf(
System.getProperty("RunCapacitySchedulerPerfTests")));
+ int numThreads = Integer.valueOf(System.getProperty(
+ "CapacitySchedulerPerfTestsNumThreads", "0"));
+
if (numOfResourceTypes > 2) {
// Initialize resource map
Map<String, ResourceInformation> riMap = new HashMap<>();
@@ -112,13 +141,30 @@ public class TestCapacitySchedulerPerf {
CapacitySchedulerConfiguration csconf =
createCSConfWithManyQueues(numQueues);
+ if (numThreads > 0) {
+ csconf.setScheduleAynschronously(true);
+ csconf.setInt(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD,
+ numThreads);
+ csconf.setLong(
+ CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_PREFIX
+ + ".scheduling-interval-ms", 0);
+ }
YarnConfiguration conf = new YarnConfiguration(csconf);
// Don't reset resource types since we have already configured resource
// types
conf.setBoolean(TEST_CONF_RESET_RESOURCE_TYPES, false);
- conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
- ResourceScheduler.class);
+
+ if (numThreads > 0) {
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacitySchedulerPerf.class,
+ ResourceScheduler.class);
+ // avoid getting skipped (see CapacityScheduler.shouldSkipNodeSchedule)
+ conf.setLong(YarnConfiguration.RM_NM_HEARTBEAT_INTERVAL_MS, 600000);
+ } else {
+ conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
+ ResourceScheduler.class);
+ }
MockRM rm = new MockRM(conf);
rm.start();
@@ -189,6 +235,13 @@ public class TestCapacitySchedulerPerf {
RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
+ if (numThreads > 0) {
+ // disable async scheduling threads
+ for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
+ t.suspendSchedule();
+ }
+ }
+
FiCaSchedulerApp[] fiCaApps = new FiCaSchedulerApp[totalApps];
for (int i=0;i<totalApps;i++) {
fiCaApps[i] =
@@ -213,79 +266,145 @@ public class TestCapacitySchedulerPerf {
lqs[i].setUserLimitFactor((float)0.0);
}
- // allocate one container for each extra apps since
- // LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
- cs.handle(new NodeUpdateSchedulerEvent(node));
- cs.handle(new NodeUpdateSchedulerEvent(node2));
+ if (numThreads > 0) {
+ // enable async scheduling threads
+ for (CapacityScheduler.AsyncScheduleThread t : cs.asyncSchedulerThreads) {
+ t.beginSchedule();
+ }
- // make sure only the extra apps have allocated containers
- for (int i=0;i<totalApps;i++) {
- boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
- if (i < activeQueues) {
- assertFalse(pending);
- assertEquals(0,
- fiCaApps[i].getTotalPendingRequestsPerPartition().size());
- } else {
- assertTrue(pending);
- assertEquals(1*GB,
- fiCaApps[i].getTotalPendingRequestsPerPartition()
- .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ // let the threads allocate resources for extra apps
+ while (CapacitySchedulerMetrics.getMetrics().commitSuccess.lastStat()
+ .numSamples() < activeQueues) {
+ Thread.sleep(1000);
+ }
+
+ // count the number of apps with allocated containers
+ int numNotPending = 0;
+ for (int i = 0; i < totalApps; i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (!pending) {
+ numNotPending++;
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertEquals(1*GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
+ }
+
+ // make sure only extra apps have allocated containers
+ assertEquals(activeQueues, numNotPending);
+ } else {
+ // allocate one container for each extra apps since
+ // LeafQueue.canAssignToUser() checks for used > limit, not used >= limit
+ cs.handle(new NodeUpdateSchedulerEvent(node));
+ cs.handle(new NodeUpdateSchedulerEvent(node2));
+
+ // make sure only the extra apps have allocated containers
+ for (int i=0;i<totalApps;i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (i < activeQueues) {
+ assertFalse(pending);
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertTrue(pending);
+ assertEquals(1*GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
}
}
// Quiet the loggers while measuring throughput
GenericTestUtils.setRootLogLevel(Level.WARN);
- final int topn = 20;
- final int iterations = 2000000;
- final int printInterval = 20000;
- final float numerator = 1000.0f * printInterval;
- PriorityQueue<Long> queue = new PriorityQueue<>(topn,
- Collections.reverseOrder());
-
- long n = Time.monotonicNow();
- long timespent = 0;
- for (int i = 0; i < iterations; i+=2) {
- if (i > 0 && i % printInterval == 0){
- long ts = (Time.monotonicNow() - n);
- if (queue.size() < topn) {
- queue.offer(ts);
- } else {
- Long last = queue.peek();
- if (last > ts) {
- queue.poll();
+
+ if (numThreads > 0) {
+ System.out.println("Starting now");
+ ((CapacitySchedulerPerf) cs).enable = true;
+ long start = Time.monotonicNow();
+ Thread.sleep(60000);
+ long end = Time.monotonicNow();
+ ((CapacitySchedulerPerf) cs).enable = false;
+ long numOps = ((CapacitySchedulerPerf) cs).count.get();
+ System.out.println("Number of operations: " + numOps);
+ System.out.println("Time taken: " + (end - start) + " ms");
+ System.out.println("" + (numOps * 1000 / (end - start))
+ + " ops / second");
+ } else {
+ final int topn = 20;
+ final int iterations = 2000000;
+ final int printInterval = 20000;
+ final float numerator = 1000.0f * printInterval;
+ PriorityQueue<Long> queue = new PriorityQueue<>(topn,
+ Collections.reverseOrder());
+
+ long n = Time.monotonicNow();
+ long timespent = 0;
+ for (int i = 0; i < iterations; i+=2) {
+ if (i > 0 && i % printInterval == 0){
+ long ts = (Time.monotonicNow() - n);
+ if (queue.size() < topn) {
queue.offer(ts);
+ } else {
+ Long last = queue.peek();
+ if (last > ts) {
+ queue.poll();
+ queue.offer(ts);
+ }
}
+ System.out.println(i + " " + (numerator / ts));
+ n = Time.monotonicNow();
}
- System.out.println(i + " " + (numerator / ts));
- n= Time.monotonicNow();
+ cs.handle(new NodeUpdateSchedulerEvent(node));
+ cs.handle(new NodeUpdateSchedulerEvent(node2));
}
- cs.handle(new NodeUpdateSchedulerEvent(node));
- cs.handle(new NodeUpdateSchedulerEvent(node2));
- }
- timespent=0;
- int entries = queue.size();
- while(queue.size() > 0){
- long l = queue.poll();
- timespent += l;
+ timespent = 0;
+ int entries = queue.size();
+ while (queue.size() > 0) {
+ long l = queue.poll();
+ timespent += l;
+ }
+ System.out.println("#ResourceTypes = " + numOfResourceTypes
+ + ". Avg of fastest " + entries
+ + ": " + numerator / (timespent / entries) + " ops/sec of "
+ + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
+ + " queues.");
}
- System.out.println(
- "#ResourceTypes = " + numOfResourceTypes + ". Avg of fastest " + entries
- + ": " + numerator / (timespent / entries) + " ops/sec of "
- + appCount + " apps on " + pctActiveQueues + "% of " + numQueues
- + " queues.");
- // make sure only the extra apps have allocated containers
- for (int i=0;i<totalApps;i++) {
- boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
- if (i < activeQueues) {
- assertFalse(pending);
- assertEquals(0,
- fiCaApps[i].getTotalPendingRequestsPerPartition().size());
- } else {
- assertTrue(pending);
- assertEquals(1*GB,
- fiCaApps[i].getTotalPendingRequestsPerPartition()
- .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ if (numThreads > 0) {
+ // count the number of apps with allocated containers
+ int numNotPending = 0;
+ for (int i = 0; i < totalApps; i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (!pending) {
+ numNotPending++;
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertEquals(1*GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
+ }
+
+ // make sure only extra apps have allocated containers
+ assertEquals(activeQueues, numNotPending);
+ } else {
+ // make sure only the extra apps have allocated containers
+ for (int i = 0; i < totalApps; i++) {
+ boolean pending = fiCaApps[i].getAppSchedulingInfo().isPending();
+ if (i < activeQueues) {
+ assertFalse(pending);
+ assertEquals(0,
+ fiCaApps[i].getTotalPendingRequestsPerPartition().size());
+ } else {
+ assertTrue(pending);
+ assertEquals(1 * GB,
+ fiCaApps[i].getTotalPendingRequestsPerPartition()
+ .get(RMNodeLabelsManager.NO_LABEL).getMemorySize());
+ }
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 5bf8b05..ba682f7 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -28,6 +28,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
.capacity.CapacitySchedulerConfiguration.ROOT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
@@ -45,15 +46,18 @@ import java.util.ConcurrentModificationException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Random;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.collect.ImmutableMap;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.util.Time;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
@@ -897,6 +901,537 @@ public class TestLeafQueue {
}
@Test
+ public void testUserLimitCache() throws Exception {
+ // Parameters
+ final int numNodes = 4;
+ final int nodeSize = 100;
+ final int numAllocationThreads = 2;
+ final int numUsers = 40;
+ final int containerSize = 1 * GB;
+ final int numContainersPerApp = 10;
+ final int runTime = 5000; // in ms
+
+ Random random = new Random();
+
+ // Setup nodes
+ FiCaSchedulerNode[] nodes = new FiCaSchedulerNode[numNodes];
+ Map<NodeId, FiCaSchedulerNode> nodesMap = new HashMap<>(nodes.length);
+ for (int i = 0; i < numNodes; i++) {
+ String host = "127.0.0." + i;
+ FiCaSchedulerNode node = TestUtils.getMockNode(host, DEFAULT_RACK, 0,
+ nodeSize * GB, nodeSize);
+ nodes[i] = node;
+ nodesMap.put(node.getNodeID(), node);
+ }
+
+ Resource clusterResource =
+ Resources.createResource(numNodes * (nodeSize * GB),
+ numNodes * nodeSize);
+
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+ // working with just one queue
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A});
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
+ csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
+ 100);
+
+ // reinitialize queues
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+ CSQueue newRoot =
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT,
+ newQueues, queues,
+ TestUtils.spyHook);
+ queues = newQueues;
+ root.reinitialize(newRoot, csContext.getClusterResource());
+ root.updateClusterResource(clusterResource,
+ new ResourceLimits(clusterResource));
+
+ // Mock the queue
+ LeafQueue leafQueue = stubLeafQueue((LeafQueue) queues.get(A));
+
+ // Set user limit factor so some users are at their limit and the
+ // user limit cache has more than just a few entries
+ leafQueue.setUserLimitFactor(10 / nodeSize);
+
+ // Flag to let allocation threads know to stop
+ AtomicBoolean stopThreads = new AtomicBoolean(false);
+ AtomicBoolean errorInThreads = new AtomicBoolean(false);
+
+ // Set up allocation threads
+ Thread[] threads = new Thread[numAllocationThreads];
+ for (int i = 0; i < numAllocationThreads; i++) {
+ threads[i] = new Thread(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ boolean alwaysNull = true;
+ while (!stopThreads.get()) {
+ CSAssignment assignment = leafQueue.assignContainers(
+ clusterResource,
+ nodes[random.nextInt(numNodes)],
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, leafQueue,
+ nodesMap, leafQueue.applicationAttemptMap);
+
+ if (assignment != CSAssignment.NULL_ASSIGNMENT) {
+ alwaysNull = false;
+ }
+ Thread.sleep(500);
+ }
+
+ // One more assignment but not committing so that the
+ // user limits cache is updated to the latest version
+ CSAssignment assignment = leafQueue.assignContainers(
+ clusterResource,
+ nodes[random.nextInt(numNodes)],
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+ if (alwaysNull && assignment == CSAssignment.NULL_ASSIGNMENT) {
+ LOG.error("Thread only got null assignments");
+ errorInThreads.set(true);
+ }
+ } catch (Exception e) {
+ LOG.error("Thread exiting because of exception", e);
+ errorInThreads.set(true);
+ }
+ }
+ }, "Scheduling Thread " + i);
+ }
+
+ // Set up users and some apps
+ final String[] users = new String[numUsers];
+ for (int i = 0; i < users.length; i++) {
+ users[i] = "user_" + i;
+ }
+ List<ApplicationAttemptId> applicationAttemptIds =
+ new ArrayList<>(10);
+ List<FiCaSchedulerApp> apps = new ArrayList<>(10);
+ Priority priority = TestUtils.createMockPriority(1);
+
+ // Start up 10 apps to begin with
+ int appId;
+ for (appId = 0; appId < 10; appId++) {
+ String user = users[random.nextInt(users.length)];
+ ApplicationAttemptId applicationAttemptId =
+ TestUtils.getMockApplicationAttemptId(appId, 0);
+ FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
+ user,
+ leafQueue, leafQueue.getUsersManager(), spyRMContext);
+
+ leafQueue.submitApplicationAttempt(app, user);
+ app.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
+ numContainersPerApp, true, priority, recordFactory)));
+
+ applicationAttemptIds.add(applicationAttemptId);
+ apps.add(app);
+ }
+
+ // Start threads
+ for (int i = 0; i < numAllocationThreads; i++) {
+ threads[i].start();
+ }
+
+ final long startTime = Time.monotonicNow();
+ while (true) {
+ // Start a new app about half the iterations and stop a random app the
+ // rest of the iterations
+ boolean startOrStopApp = random.nextBoolean();
+ if (startOrStopApp || (apps.size() == 1)) {
+ // start a new app
+ String user = users[random.nextInt(users.length)];
+ ApplicationAttemptId applicationAttemptId =
+ TestUtils.getMockApplicationAttemptId(appId, 0);
+ FiCaSchedulerApp app = new FiCaSchedulerApp(applicationAttemptId,
+ user,
+ leafQueue, leafQueue.getUsersManager(), spyRMContext);
+
+ leafQueue.submitApplicationAttempt(app, user);
+ app.updateResourceRequests(Collections.singletonList(
+ TestUtils.createResourceRequest(ResourceRequest.ANY, containerSize,
+ numContainersPerApp, true, priority, recordFactory)));
+
+ applicationAttemptIds.add(applicationAttemptId);
+ apps.add(app);
+
+ appId++;
+ } else {
+ // stop a random app
+ int i = random.nextInt(apps.size());
+ FiCaSchedulerApp app = apps.get(i);
+ leafQueue.finishApplication(app.getApplicationId(), app.getUser());
+ leafQueue.releaseResource(clusterResource, app,
+ app.getCurrentConsumption(), "", null);
+ apps.remove(i);
+ applicationAttemptIds.remove(i);
+ }
+
+ if (errorInThreads.get() || (Time.monotonicNow() - startTime) > runTime) {
+ break;
+ }
+ }
+
+ // signal allocation threads to stop
+ stopThreads.set(true);
+
+ // wait for allocation threads to be done
+ for (int i = 0; i < numAllocationThreads; i++) {
+ threads[i].join();
+ }
+
+ // check if there was an error in the allocation threads
+ assertFalse(errorInThreads.get());
+
+ // check there is only one partition in the user limits cache
+ assertEquals( 1, leafQueue.userLimitsCache.size());
+
+ Map<SchedulingMode, ConcurrentMap<String, LeafQueue.CachedUserLimit>>
+ uLCByPartition = leafQueue.userLimitsCache.get(nodes[0].getPartition());
+
+ // check there is only one scheduling mode
+ assertEquals(uLCByPartition.size(), 1);
+
+ ConcurrentMap<String, LeafQueue.CachedUserLimit> uLCBySchedulingMode =
+ uLCByPartition.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+ // check entries in the user limits cache
+ for (Map.Entry<String, LeafQueue.CachedUserLimit> entry :
+ uLCBySchedulingMode.entrySet()) {
+ String user = entry.getKey();
+ Resource userLimit = entry.getValue().userLimit;
+
+ Resource expectedUL = leafQueue.getResourceLimitForActiveUsers(user,
+ clusterResource, nodes[0].getPartition(),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+
+ assertEquals(expectedUL, userLimit);
+ }
+
+ // check the current version in the user limits cache
+ assertEquals(leafQueue.getUsersManager().getLatestVersionOfUsersState(),
+ leafQueue.currentUserLimitCacheVersion);
+ assertTrue(leafQueue.currentUserLimitCacheVersion > 0);
+ }
+
+ @Test
+ public void testUserLimitCacheActiveUsersChanged() throws Exception {
+ // Setup some nodes
+ String host_0 = "127.0.0.1";
+ FiCaSchedulerNode node_0 =
+ TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 6*GB);
+ String host_1 = "127.0.0.2";
+ FiCaSchedulerNode node_1 =
+ TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 6*GB);
+ String host_2 = "127.0.0.3";
+ FiCaSchedulerNode node_2 =
+ TestUtils.getMockNode(host_2, DEFAULT_RACK, 0, 6*GB);
+ String host_3 = "127.0.0.4";
+ FiCaSchedulerNode node_3 =
+ TestUtils.getMockNode(host_3, DEFAULT_RACK, 0, 6*GB);
+
+ Map<NodeId, FiCaSchedulerNode> nodes =
+ ImmutableMap.of(
+ node_0.getNodeID(), node_0,
+ node_1.getNodeID(), node_1,
+ node_2.getNodeID(), node_2,
+ node_3.getNodeID(), node_3
+ );
+
+ final int numNodes = 4;
+ Resource clusterResource =
+ Resources.createResource(numNodes * (6*GB), numNodes);
+
+ when(csContext.getNumClusterNodes()).thenReturn(numNodes);
+ when(csContext.getClusterResource()).thenReturn(clusterResource);
+
+ // working with just one queue
+ csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A});
+ csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + "." + A, 100);
+ csConf.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + "." + A,
+ 100);
+
+ // reinitialize queues
+ Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
+ CSQueue newRoot =
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT,
+ newQueues, queues,
+ TestUtils.spyHook);
+ queues = newQueues;
+ root.reinitialize(newRoot, csContext.getClusterResource());
+ root.updateClusterResource(clusterResource,
+ new ResourceLimits(clusterResource));
+
+ // Mock the queue
+ LeafQueue leafQueue = stubLeafQueue((LeafQueue)queues.get(A));
+
+ // initial check
+ assertEquals(0, leafQueue.userLimitsCache.size());
+ assertEquals(0,
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
+ assertEquals(0,
+ leafQueue.getUsersManager().preComputedActiveUserLimit.size());
+
+ // 4 users
+ final String user_0 = "user_0";
+ final String user_1 = "user_1";
+ final String user_2 = "user_2";
+ final String user_3 = "user_3";
+
+ // Set user-limit
+ leafQueue.setUserLimit(0);
+ leafQueue.setUserLimitFactor(1.0f);
+
+ Priority priority = TestUtils.createMockPriority(1);
+
+ // Fill queue because user limit is calculated as (used / #active users).
+ final ApplicationAttemptId appAttemptId_9 =
+ TestUtils.getMockApplicationAttemptId(9, 0);
+ FiCaSchedulerApp app_9 =
+ new FiCaSchedulerApp(appAttemptId_9, user_0, leafQueue,
+ leafQueue.getUsersManager(), spyRMContext);
+ leafQueue.submitApplicationAttempt(app_9, user_0);
+
+ Map<ApplicationAttemptId, FiCaSchedulerApp> apps =
+ ImmutableMap.of(app_9.getApplicationAttemptId(), app_9);
+
+ app_9.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_0, 1*GB, 5, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
+ priority, recordFactory)));
+ assertEquals(1, leafQueue.getUsersManager().getNumActiveUsers());
+
+ CSAssignment assignment;
+ for (int i = 0; i < 5; i++) {
+ assignment = leafQueue.assignContainers(clusterResource, node_0,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+ }
+ app_9.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_1, 1*GB, 5, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 1*GB, 5, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 5, true,
+ priority, recordFactory)));
+ for (int i = 0; i < 5; i++) {
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+ }
+ // A total of 10GB have been allocated
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+ // For one user who should have been cached in the assignContainers call
+ assertEquals(1, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .size());
+ // But the cache is stale because an allocation was made
+ assertNotEquals(leafQueue.currentUserLimitCacheVersion,
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
+ // Have not made any calls to fill up the all user limit in UsersManager
+ assertEquals(0,
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
+ // But the user limit cache in leafQueue got filled up using the active
+ // user limit in UsersManager
+ assertEquals(1,
+ leafQueue.getUsersManager().preComputedActiveUserLimit.size());
+
+ // submit 3 applications for now
+ final ApplicationAttemptId appAttemptId_0 =
+ TestUtils.getMockApplicationAttemptId(0, 0);
+ FiCaSchedulerApp app_0 =
+ new FiCaSchedulerApp(appAttemptId_0, user_0, leafQueue,
+ leafQueue.getUsersManager(), spyRMContext);
+ leafQueue.submitApplicationAttempt(app_0, user_0);
+
+ final ApplicationAttemptId appAttemptId_1 =
+ TestUtils.getMockApplicationAttemptId(1, 0);
+ FiCaSchedulerApp app_1 =
+ new FiCaSchedulerApp(appAttemptId_1, user_1, leafQueue,
+ leafQueue.getUsersManager(), spyRMContext);
+ leafQueue.submitApplicationAttempt(app_1, user_1);
+
+ final ApplicationAttemptId appAttemptId_2 =
+ TestUtils.getMockApplicationAttemptId(2, 0);
+ FiCaSchedulerApp app_2 =
+ new FiCaSchedulerApp(appAttemptId_2, user_2, leafQueue,
+ leafQueue.getUsersManager(), spyRMContext);
+ leafQueue.submitApplicationAttempt(app_2, user_2);
+
+ apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0,
+ app_1.getApplicationAttemptId(), app_1,
+ app_2.getApplicationAttemptId(), app_2
+ );
+
+ // requests from first three users (all of which will be locality delayed)
+ app_0.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+ priority, recordFactory)));
+
+ app_1.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+ priority, recordFactory)));
+
+ app_2.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+ priority, recordFactory)));
+
+ // There are 3 active users right now
+ assertEquals(3, leafQueue.getUsersManager().getNumActiveUsers());
+
+ // fill up user limit cache
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+ // A total of 10GB have been allocated
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+ // There are three users who should have been cached
+ assertEquals(3, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .size());
+ // There are three users so each has a limit of 12/3 = 4GB
+ assertEquals(4*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_0).userLimit.getMemorySize());
+ assertEquals(4*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_1).userLimit.getMemorySize());
+ assertEquals(4*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_2).userLimit.getMemorySize());
+ // And the cache is NOT stale because no allocation was made
+ assertEquals(leafQueue.currentUserLimitCacheVersion,
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
+ // Have not made any calls to fill up the all user limit in UsersManager
+ assertEquals(0,
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
+ // But the user limit cache in leafQueue got filled up using the active
+ // user limit in UsersManager with 4GB limit (since there are three users
+ // so 12/3 = 4GB each)
+ assertEquals(1, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.size());
+ assertEquals(1, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
+ assertEquals(4*GB, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
+
+ // submit the 4th application
+ final ApplicationAttemptId appAttemptId_3 =
+ TestUtils.getMockApplicationAttemptId(3, 0);
+ FiCaSchedulerApp app_3 =
+ new FiCaSchedulerApp(appAttemptId_3, user_3, leafQueue,
+ leafQueue.getUsersManager(), spyRMContext);
+ leafQueue.submitApplicationAttempt(app_3, user_3);
+
+ apps = ImmutableMap.of(
+ app_0.getApplicationAttemptId(), app_0,
+ app_1.getApplicationAttemptId(), app_1,
+ app_2.getApplicationAttemptId(), app_2,
+ app_3.getApplicationAttemptId(), app_3
+ );
+
+ app_3.updateResourceRequests(Arrays.asList(
+ TestUtils.createResourceRequest(host_0, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(DEFAULT_RACK, 4*GB, 1, true,
+ priority, recordFactory),
+ TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 10, true,
+ priority, recordFactory)));
+
+ // 4 active users now
+ assertEquals(4, leafQueue.getUsersManager().getNumActiveUsers());
+ // Check that the user limits cache has become stale
+ assertNotEquals(leafQueue.currentUserLimitCacheVersion,
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
+
+ // Even though there are no allocations, user limit cache is repopulated
+ assignment = leafQueue.assignContainers(clusterResource, node_1,
+ new ResourceLimits(clusterResource),
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
+ applyCSAssignment(clusterResource, assignment, leafQueue, nodes, apps);
+ // A total of 10GB have been allocated
+ assertEquals(10*GB, leafQueue.getUsedResources().getMemorySize());
+ assertEquals(0*GB, app_0.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_1.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_2.getCurrentConsumption().getMemorySize());
+ assertEquals(0*GB, app_3.getCurrentConsumption().getMemorySize());
+ assertEquals(10*GB, app_9.getCurrentConsumption().getMemorySize());
+ // There are four users who should have been cached
+ assertEquals(4, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .size());
+ // There are four users so each has a limit of 12/4 = 3GB
+ assertEquals(3*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_0).userLimit.getMemorySize());
+ assertEquals(3*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_1).userLimit.getMemorySize());
+ assertEquals(3*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_2).userLimit.getMemorySize());
+ assertEquals(3*GB, leafQueue.userLimitsCache
+ .get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY)
+ .get(user_3).userLimit.getMemorySize());
+ // And the cache is NOT stale because no allocation was made
+ assertEquals(leafQueue.currentUserLimitCacheVersion,
+ leafQueue.getUsersManager().getLatestVersionOfUsersState());
+ // Have not made any calls to fill up the all user limit in UsersManager
+ assertEquals(0,
+ leafQueue.getUsersManager().preComputedAllUserLimit.size());
+ // But the user limit cache in leafQueue got filled up using the active
+ // user limit in UsersManager with 3GB limit (since there are four users
+ // so 12/4 = 3GB each)
+ assertEquals(1, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.size());
+ assertEquals(1, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL).size());
+ assertEquals(3*GB, leafQueue.getUsersManager()
+ .preComputedActiveUserLimit.get(RMNodeLabelsManager.NO_LABEL)
+ .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
+ }
+
+ @Test
public void testUserLimits() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org