You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2013/11/29 20:14:36 UTC
svn commit: r1546626 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-ser...
Author: sandy
Date: Fri Nov 29 19:14:36 2013
New Revision: 1546626
URL: http://svn.apache.org/r1546626
Log:
YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf queues. (Sandy Ryza)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
- copied unchanged from r1546625, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
- copied unchanged from r1546625, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Nov 29 19:14:36 2013
@@ -108,6 +108,9 @@ Release 2.3.0 - UNRELEASED
YARN-1239. Modified ResourceManager state-store implementations to start
storing version numbers. (Jian He via vinodkv)
+ YARN-1241. In Fair Scheduler, maxRunningApps does not work for non-leaf
+ queues. (Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Fri Nov 29 19:14:36 2013
@@ -51,7 +51,6 @@ public class AppSchedulable extends Sche
private FairScheduler scheduler;
private FSSchedulerApp app;
private Resource demand = Resources.createResource(0);
- private boolean runnable = false; // everyone starts as not runnable
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
@@ -61,7 +60,7 @@ public class AppSchedulable extends Sche
public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
- this.startTime = System.currentTimeMillis();
+ this.startTime = scheduler.getClock().getTime();
this.queue = queue;
this.containerTokenSecretManager = scheduler.
getContainerTokenSecretManager();
@@ -139,18 +138,6 @@ public class AppSchedulable extends Sche
}
/**
- * Is this application runnable? Runnable means that the user and queue
- * application counts are within configured quotas.
- */
- public boolean getRunnable() {
- return runnable;
- }
-
- public void setRunnable(boolean runnable) {
- this.runnable = runnable;
- }
-
- /**
* Create and return a container object reflecting an allocation for the
* given appliction on the given node with the given capability and
* priority.
@@ -281,9 +268,6 @@ public class AppSchedulable extends Sche
unreserve(priority, node);
return Resources.none();
}
- } else {
- // If this app is over quota, don't schedule anything
- if (!(getRunnable())) { return Resources.none(); }
}
Collection<Priority> prioritiesToTry = (reserved) ?
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Fri Nov 29 19:14:36 2013
@@ -22,7 +22,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.Iterator;
import java.util.List;
import org.apache.commons.logging.Log;
@@ -42,7 +41,9 @@ public class FSLeafQueue extends FSQueue
private static final Log LOG = LogFactory.getLog(
FSLeafQueue.class.getName());
- private final List<AppSchedulable> appScheds =
+ private final List<AppSchedulable> runnableAppScheds = // apps that are runnable
+ new ArrayList<AppSchedulable>();
+ private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>();
private final FairScheduler scheduler;
@@ -62,29 +63,51 @@ public class FSLeafQueue extends FSQueue
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
- public void addApp(FSSchedulerApp app) {
+ public void addApp(FSSchedulerApp app, boolean runnable) {
AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
app.setAppSchedulable(appSchedulable);
- appScheds.add(appSchedulable);
+ if (runnable) {
+ runnableAppScheds.add(appSchedulable);
+ } else {
+ nonRunnableAppScheds.add(appSchedulable);
+ }
}
// for testing
void addAppSchedulable(AppSchedulable appSched) {
- appScheds.add(appSched);
+ runnableAppScheds.add(appSched);
}
- public void removeApp(FSSchedulerApp app) {
- for (Iterator<AppSchedulable> it = appScheds.iterator(); it.hasNext();) {
- AppSchedulable appSched = it.next();
- if (appSched.getApp() == app) {
- it.remove();
- break;
- }
+ /**
+ * Removes the given app from this queue.
+ * @return whether or not the app was runnable
+ */
+ public boolean removeApp(FSSchedulerApp app) {
+ if (runnableAppScheds.remove(app.getAppSchedulable())) {
+ return true;
+ } else if (nonRunnableAppScheds.remove(app.getAppSchedulable())) {
+ return false;
+ } else {
+ throw new IllegalStateException("Given app to remove " + app +
+ " does not exist in queue " + this);
}
}
- public Collection<AppSchedulable> getAppSchedulables() {
- return appScheds;
+ public void makeAppRunnable(AppSchedulable appSched) {
+ if (!nonRunnableAppScheds.remove(appSched)) {
+ throw new IllegalStateException("Can't make app runnable that does not " +
+ "already exist in queue as non-runnable" + appSched);
+ }
+
+ runnableAppScheds.add(appSched);
+ }
+
+ public Collection<AppSchedulable> getRunnableAppSchedulables() {
+ return runnableAppScheds;
+ }
+
+ public List<AppSchedulable> getNonRunnableAppSchedulables() {
+ return nonRunnableAppScheds;
}
@Override
@@ -98,7 +121,7 @@ public class FSLeafQueue extends FSQueue
@Override
public void recomputeShares() {
- policy.computeShares(getAppSchedulables(), getFairShare());
+ policy.computeShares(getRunnableAppSchedulables(), getFairShare());
}
@Override
@@ -109,7 +132,10 @@ public class FSLeafQueue extends FSQueue
@Override
public Resource getResourceUsage() {
Resource usage = Resources.createResource(0);
- for (AppSchedulable app : appScheds) {
+ for (AppSchedulable app : runnableAppScheds) {
+ Resources.addTo(usage, app.getResourceUsage());
+ }
+ for (AppSchedulable app : nonRunnableAppScheds) {
Resources.addTo(usage, app.getResourceUsage());
}
return usage;
@@ -121,25 +147,35 @@ public class FSLeafQueue extends FSQueue
// Limit demand to maxResources
Resource maxRes = queueMgr.getMaxResources(getName());
demand = Resources.createResource(0);
- for (AppSchedulable sched : appScheds) {
- sched.updateDemand();
- Resource toAdd = sched.getDemand();
- if (LOG.isDebugEnabled()) {
- LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
- + "; Total resource consumption for " + getName() + " now "
- + demand);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (Resources.equals(demand, maxRes)) {
+ break;
}
- demand = Resources.add(demand, toAdd);
- demand = Resources.componentwiseMin(demand, maxRes);
+ updateDemandForApp(sched, maxRes);
+ }
+ for (AppSchedulable sched : nonRunnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
break;
}
+ updateDemandForApp(sched, maxRes);
}
if (LOG.isDebugEnabled()) {
LOG.debug("The updated demand for " + getName() + " is " + demand
+ "; the max is " + maxRes);
}
}
+
+ private void updateDemandForApp(AppSchedulable sched, Resource maxRes) {
+ sched.updateDemand();
+ Resource toAdd = sched.getDemand();
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Counting resource from " + sched.getName() + " " + toAdd
+ + "; Total resource consumption for " + getName() + " now "
+ + demand);
+ }
+ demand = Resources.add(demand, toAdd);
+ demand = Resources.componentwiseMin(demand, maxRes);
+ }
@Override
public Resource assignContainer(FSSchedulerNode node) {
@@ -153,17 +189,15 @@ public class FSLeafQueue extends FSQueue
}
Comparator<Schedulable> comparator = policy.getComparator();
- Collections.sort(appScheds, comparator);
- for (AppSchedulable sched : appScheds) {
- if (sched.getRunnable()) {
- if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
- continue;
- }
-
- assigned = sched.assignContainer(node);
- if (!assigned.equals(Resources.none())) {
- break;
- }
+ Collections.sort(runnableAppScheds, comparator);
+ for (AppSchedulable sched : runnableAppScheds) {
+ if (SchedulerAppUtils.isBlacklisted(sched.getApp(), node, LOG)) {
+ continue;
+ }
+
+ assigned = sched.assignContainer(node);
+ if (!assigned.equals(Resources.none())) {
+ break;
}
}
return assigned;
@@ -205,4 +239,9 @@ public class FSLeafQueue extends FSQueue
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
+
+ @Override
+ public int getNumRunnableApps() {
+ return runnableAppScheds.size();
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Fri Nov 29 19:14:36 2013
@@ -43,6 +43,7 @@ public class FSParentQueue extends FSQue
new ArrayList<FSQueue>();
private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
+ private int runnableApps;
public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
FSParentQueue parent) {
@@ -171,4 +172,17 @@ public class FSParentQueue extends FSQue
}
super.policy = policy;
}
+
+ public void incrementRunnableApps() {
+ runnableApps++;
+ }
+
+ public void decrementRunnableApps() {
+ runnableApps--;
+ }
+
+ @Override
+ public int getNumRunnableApps() {
+ return runnableApps;
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Nov 29 19:14:36 2013
@@ -72,6 +72,10 @@ public abstract class FSQueue extends Sc
public SchedulingPolicy getPolicy() {
return policy;
}
+
+ public FSParentQueue getParent() {
+ return parent;
+ }
protected void throwPolicyDoesnotApplyException(SchedulingPolicy policy)
throws AllocationConfigurationException {
@@ -165,6 +169,12 @@ public abstract class FSQueue extends Sc
public abstract Collection<FSQueue> getChildQueues();
/**
+ * Return the number of apps for which containers can be allocated.
+ * Includes apps in subqueues.
+ */
+ public abstract int getNumRunnableApps();
+
+ /**
* Helper method to check if the queue should attempt assigning resources
*
* @return true if check passes (can assign) or false otherwise
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSSchedulerApp.java Fri Nov 29 19:14:36 2013
@@ -44,7 +44,6 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -62,7 +61,7 @@ public class FSSchedulerApp extends Sche
final Map<RMContainer, Long> preemptionMap = new HashMap<RMContainer, Long>();
public FSSchedulerApp(ApplicationAttemptId applicationAttemptId,
- String user, Queue queue, ActiveUsersManager activeUsersManager,
+ String user, FSLeafQueue queue, ActiveUsersManager activeUsersManager,
RMContext rmContext) {
super(applicationAttemptId, user, queue, activeUsersManager, rmContext);
}
@@ -327,4 +326,9 @@ public class FSSchedulerApp extends Sche
public Set<RMContainer> getPreemptionContainers() {
return preemptionMap.keySet();
}
+
+ @Override
+ public FSLeafQueue getQueue() {
+ return (FSLeafQueue)super.getQueue();
+ }
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 29 19:14:36 2013
@@ -190,9 +190,13 @@ public class FairScheduler implements Re
// heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
+ @VisibleForTesting
+ final MaxRunningAppsEnforcer maxRunningEnforcer;
+
public FairScheduler() {
clock = new SystemClock();
queueMgr = new QueueManager(this);
+ maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
}
private void validateConf(Configuration conf) {
@@ -272,7 +276,6 @@ public class FairScheduler implements Re
*/
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
- updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@@ -377,7 +380,7 @@ public class FairScheduler implements Re
for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(RESOURCE_CALCULATOR, clusterCapacity,
sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as : sched.getAppSchedulables()) {
+ for (AppSchedulable as : sched.getRunnableAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@@ -505,63 +508,23 @@ public class FairScheduler implements Re
return resToPreempt;
}
- /**
- * This updates the runnability of all apps based on whether or not any
- * users/queues have exceeded their capacity.
- */
- private void updateRunnability() {
- List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
-
- // Start by marking everything as not runnable
- for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
- for (AppSchedulable a : leafQueue.getAppSchedulables()) {
- a.setRunnable(false);
- apps.add(a);
- }
- }
- // Create a list of sorted jobs in order of start time and priority
- Collections.sort(apps, new FifoAppComparator());
- // Mark jobs as runnable in order of start time and priority, until
- // user or queue limits have been reached.
- Map<String, Integer> userApps = new HashMap<String, Integer>();
- Map<String, Integer> queueApps = new HashMap<String, Integer>();
-
- for (AppSchedulable app : apps) {
- String user = app.getApp().getUser();
- String queue = app.getApp().getQueueName();
- int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
- int queueCount = queueApps.containsKey(queue) ? queueApps.get(queue) : 0;
- if (userCount < queueMgr.getUserMaxApps(user) &&
- queueCount < queueMgr.getQueueMaxApps(queue)) {
- userApps.put(user, userCount + 1);
- queueApps.put(queue, queueCount + 1);
- app.setRunnable(true);
- }
- }
- }
-
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
return rmContext.getContainerTokenSecretManager();
}
// synchronized for sizeBasedWeight
public synchronized ResourceWeights getAppWeight(AppSchedulable app) {
- if (!app.getRunnable()) {
- // Job won't launch tasks, but don't return 0 to avoid division errors
- return ResourceWeights.NEUTRAL;
- } else {
- double weight = 1.0;
- if (sizeBasedWeight) {
- // Set weight based on current memory demand
- weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
- }
- weight *= app.getPriority().getPriority();
- if (weightAdjuster != null) {
- // Run weight through the user-supplied weightAdjuster
- weight = weightAdjuster.adjustWeight(app, weight);
- }
- return new ResourceWeights((float)weight);
+ double weight = 1.0;
+ if (sizeBasedWeight) {
+ // Set weight based on current memory demand
+ weight = Math.log1p(app.getDemand().getMemory()) / Math.log(2);
+ }
+ weight *= app.getPriority().getPriority();
+ if (weightAdjuster != null) {
+ // Run weight through the user-supplied weightAdjuster
+ weight = weightAdjuster.adjustWeight(app, weight);
}
+ return new ResourceWeights((float)weight);
}
@Override
@@ -662,7 +625,14 @@ public class FairScheduler implements Re
return;
}
- queue.addApp(schedulerApp);
+ boolean runnable = maxRunningEnforcer.canAppBeRunnable(queue, user);
+ queue.addApp(schedulerApp, runnable);
+ if (runnable) {
+ maxRunningEnforcer.trackRunnableApp(schedulerApp);
+ } else {
+ maxRunningEnforcer.trackNonRunnableApp(schedulerApp);
+ }
+
queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
@@ -736,8 +706,14 @@ public class FairScheduler implements Re
// Inform the queue
FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
.getQueueName(), false);
- queue.removeApp(application);
+ boolean wasRunnable = queue.removeApp(application);
+ if (wasRunnable) {
+ maxRunningEnforcer.updateRunnabilityOnAppRemoval(application);
+ } else {
+ maxRunningEnforcer.untrackNonRunnableApp(application);
+ }
+
// Remove from our data-structure
applications.remove(applicationAttemptId);
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 29 19:14:36 2013
@@ -89,7 +89,8 @@ public class QueueManager {
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
- private volatile QueueManagerInfo info = new QueueManagerInfo();
+ @VisibleForTesting
+ volatile QueueManagerInfo info = new QueueManagerInfo();
@VisibleForTesting
volatile QueuePlacementPolicy placementPolicy;
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerLeafQueueInfo.java Fri Nov 29 19:14:36 2013
@@ -39,7 +39,7 @@ public class FairSchedulerLeafQueueInfo
public FairSchedulerLeafQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
super(queue, scheduler);
- Collection<AppSchedulable> apps = queue.getAppSchedulables();
+ Collection<AppSchedulable> apps = queue.getRunnableAppSchedulables();
for (AppSchedulable app : apps) {
if (app.getApp().isPending()) {
numPendingApps++;
@@ -47,6 +47,7 @@ public class FairSchedulerLeafQueueInfo
numActiveApps++;
}
}
+ numPendingApps += queue.getNonRunnableAppSchedulables().size();
}
public int getNumActiveApplications() {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java Fri Nov 29 19:14:36 2013
@@ -596,23 +596,24 @@ public class TestCapacityScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
CapacityScheduler cs = new CapacityScheduler();
verifyConcurrentAccessOnApplications(
- cs.applications, FiCaSchedulerApp.class);
+ cs.applications, FiCaSchedulerApp.class, Queue.class);
}
- public static <T extends SchedulerApplication>
+ public static <T extends SchedulerApplication, Q extends Queue>
void verifyConcurrentAccessOnApplications(
- final Map<ApplicationAttemptId, T> applications, Class<T> clazz)
+ final Map<ApplicationAttemptId, T> applications, Class<T> appClazz,
+ final Class<Q> queueClazz)
throws Exception {
final int size = 10000;
final ApplicationId appId = ApplicationId.newInstance(0, 0);
- final Constructor<T> ctor = clazz.getDeclaredConstructor(
- ApplicationAttemptId.class, String.class, Queue.class,
+ final Constructor<T> ctor = appClazz.getDeclaredConstructor(
+ ApplicationAttemptId.class, String.class, queueClazz,
ActiveUsersManager.class, RMContext.class);
ApplicationAttemptId appAttemptId0
= ApplicationAttemptId.newInstance(appId, 0);
applications.put(appAttemptId0, ctor.newInstance(
- appAttemptId0, null, mock(Queue.class), null, null));
+ appAttemptId0, null, mock(queueClazz), null, null));
assertNotNull(applications.get(appAttemptId0));
// Imitating the thread of scheduler that will add and remove apps
@@ -627,7 +628,7 @@ public class TestCapacityScheduler {
= ApplicationAttemptId.newInstance(appId, i);
try {
applications.put(appAttemptId, ctor.newInstance(
- appAttemptId, null, mock(Queue.class), null, null));
+ appAttemptId, null, mock(queueClazz), null, null));
} catch (Exception e) {
failed.set(true);
finished.set(true);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java Fri Nov 29 19:14:36 2013
@@ -24,7 +24,6 @@ import org.apache.hadoop.yarn.api.record
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.util.Clock;
import org.junit.Test;
import org.mockito.Mockito;
@@ -53,7 +52,7 @@ public class TestFSSchedulerApp {
@Test
public void testDelayScheduling() {
- Queue queue = Mockito.mock(Queue.class);
+ FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
double nodeLocalityThreshold = .5;
@@ -110,7 +109,7 @@ public class TestFSSchedulerApp {
@Test
public void testDelaySchedulingForContinuousScheduling()
throws InterruptedException {
- Queue queue = Mockito.mock(Queue.class);
+ FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
@@ -170,7 +169,7 @@ public class TestFSSchedulerApp {
* no tin use), the least restrictive locality level is returned.
*/
public void testLocalityLevelWithoutDelays() {
- Queue queue = Mockito.mock(Queue.class);
+ FSLeafQueue queue = Mockito.mock(FSLeafQueue.class);
Priority prio = Mockito.mock(Priority.class);
Mockito.when(prio.getPriority()).thenReturn(1);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Fri Nov 29 19:14:36 2013
@@ -100,7 +100,7 @@ import com.google.common.collect.Sets;
public class TestFairScheduler {
- private class MockClock implements Clock {
+ static class MockClock implements Clock {
private long time = 0;
@Override
public long getTime() {
@@ -613,9 +613,9 @@ public class TestFairScheduler {
appAttemptId, "default", "user1");
scheduler.handle(appAddedEvent);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("default", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
assertEquals("root.user1", rmApp.getQueue());
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
@@ -625,11 +625,11 @@ public class TestFairScheduler {
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
assertEquals(1, scheduler.getQueueManager().getLeafQueue("default", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user2", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
}
@Test
@@ -821,7 +821,7 @@ public class TestFairScheduler {
// That queue should have one app
assertEquals(1, scheduler.getQueueManager().getLeafQueue("user1", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
createAppAttemptId(1, 1), RMAppAttemptState.FINISHED);
@@ -831,7 +831,7 @@ public class TestFairScheduler {
// Queue should have no apps
assertEquals(0, scheduler.getQueueManager().getLeafQueue("user1", true)
- .getAppSchedulables().size());
+ .getRunnableAppSchedulables().size());
}
@Test
@@ -2400,7 +2400,158 @@ public class TestFairScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
FairScheduler fs = new FairScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
- fs.applications, FSSchedulerApp.class);
+ fs.applications, FSSchedulerApp.class, FSLeafQueue.class);
+ }
+
+
+ private void verifyAppRunnable(ApplicationAttemptId attId, boolean runnable) {
+ FSSchedulerApp app = scheduler.applications.get(attId);
+ FSLeafQueue queue = app.getQueue();
+ Collection<AppSchedulable> runnableApps =
+ queue.getRunnableAppSchedulables();
+ Collection<AppSchedulable> nonRunnableApps =
+ queue.getNonRunnableAppSchedulables();
+ assertEquals(runnable, runnableApps.contains(app.getAppSchedulable()));
+ assertEquals(!runnable, nonRunnableApps.contains(app.getAppSchedulable()));
+ }
+
+ private void verifyQueueNumRunnable(String queueName, int numRunnableInQueue,
+ int numNonRunnableInQueue) {
+ FSLeafQueue queue = scheduler.getQueueManager().getLeafQueue(queueName, false);
+ assertEquals(numRunnableInQueue,
+ queue.getRunnableAppSchedulables().size());
+ assertEquals(numNonRunnableInQueue,
+ queue.getNonRunnableAppSchedulables().size());
+ }
+
+ @Test
+ public void testUserAndQueueMaxRunningApps() throws Exception {
+ Configuration conf = createConfiguration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queue1\">");
+ out.println("<maxRunningApps>2</maxRunningApps>");
+ out.println("</queue>");
+ out.println("<user name=\"user1\">");
+ out.println("<maxRunningApps>1</maxRunningApps>");
+ out.println("</user>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // exceeds no limits
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1");
+ verifyAppRunnable(attId1, true);
+ verifyQueueNumRunnable("queue1", 1, 0);
+ // exceeds user limit
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue2", "user1");
+ verifyAppRunnable(attId2, false);
+ verifyQueueNumRunnable("queue2", 0, 1);
+ // exceeds no limits
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", "user2");
+ verifyAppRunnable(attId3, true);
+ verifyQueueNumRunnable("queue1", 2, 0);
+ // exceeds queue limit
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", "user2");
+ verifyAppRunnable(attId4, false);
+ verifyQueueNumRunnable("queue1", 2, 1);
+
+ // Remove app 1 and both app 2 and app 4 should becomes runnable in its place
+ AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
+ attId1, RMAppAttemptState.FINISHED);
+ scheduler.handle(appRemovedEvent1);
+ verifyAppRunnable(attId2, true);
+ verifyQueueNumRunnable("queue2", 1, 0);
+ verifyAppRunnable(attId4, true);
+ verifyQueueNumRunnable("queue1", 2, 0);
+
+ // A new app to queue1 should not be runnable
+ ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1", "user2");
+ verifyAppRunnable(attId5, false);
+ verifyQueueNumRunnable("queue1", 2, 1);
+ }
+
+ @Test
+ public void testMaxRunningAppsHierarchicalQueues() throws Exception {
+ Configuration conf = createConfiguration();
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+ MockClock clock = new MockClock();
+ scheduler.setClock(clock);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"queue1\">");
+ out.println(" <maxRunningApps>3</maxRunningApps>");
+ out.println(" <queue name=\"sub1\"></queue>");
+ out.println(" <queue name=\"sub2\"></queue>");
+ out.println(" <queue name=\"sub3\">");
+ out.println(" <maxRunningApps>1</maxRunningApps>");
+ out.println(" </queue>");
+ out.println("</queue>");
+ out.println("</allocations>");
+ out.close();
+
+ QueueManager queueManager = scheduler.getQueueManager();
+ queueManager.initialize();
+
+ // exceeds no limits
+ ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1.sub1", "user1");
+ verifyAppRunnable(attId1, true);
+ verifyQueueNumRunnable("queue1.sub1", 1, 0);
+ clock.tick(10);
+ // exceeds no limits
+ ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1.sub3", "user1");
+ verifyAppRunnable(attId2, true);
+ verifyQueueNumRunnable("queue1.sub3", 1, 0);
+ clock.tick(10);
+ // exceeds no limits
+ ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1.sub2", "user1");
+ verifyAppRunnable(attId3, true);
+ verifyQueueNumRunnable("queue1.sub2", 1, 0);
+ clock.tick(10);
+ // exceeds queue1 limit
+ ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1.sub2", "user1");
+ verifyAppRunnable(attId4, false);
+ verifyQueueNumRunnable("queue1.sub2", 1, 1);
+ clock.tick(10);
+ // exceeds sub3 limit
+ ApplicationAttemptId attId5 = createSchedulingRequest(1024, "queue1.sub3", "user1");
+ verifyAppRunnable(attId5, false);
+ verifyQueueNumRunnable("queue1.sub3", 1, 1);
+ clock.tick(10);
+
+ // Even though the app was removed from sub3, the app from sub2 gets to go
+ // because it came in first
+ AppRemovedSchedulerEvent appRemovedEvent1 = new AppRemovedSchedulerEvent(
+ attId2, RMAppAttemptState.FINISHED);
+ scheduler.handle(appRemovedEvent1);
+ verifyAppRunnable(attId4, true);
+ verifyQueueNumRunnable("queue1.sub2", 2, 0);
+ verifyAppRunnable(attId5, false);
+ verifyQueueNumRunnable("queue1.sub3", 0, 1);
+
+ // Now test removal of a non-runnable app
+ AppRemovedSchedulerEvent appRemovedEvent2 = new AppRemovedSchedulerEvent(
+ attId5, RMAppAttemptState.KILLED);
+ scheduler.handle(appRemovedEvent2);
+ assertEquals(0, scheduler.maxRunningEnforcer.usersNonRunnableApps
+ .get("user1").size());
+ // verify app gone in queue accounting
+ verifyQueueNumRunnable("queue1.sub3", 0, 0);
+ // verify it doesn't become runnable when there would be space for it
+ AppRemovedSchedulerEvent appRemovedEvent3 = new AppRemovedSchedulerEvent(
+ attId4, RMAppAttemptState.FINISHED);
+ scheduler.handle(appRemovedEvent3);
+ verifyQueueNumRunnable("queue1.sub2", 1, 0);
+ verifyQueueNumRunnable("queue1.sub3", 0, 0);
}
@Test (timeout = 10000)
@@ -2499,23 +2650,23 @@ public class TestFairScheduler {
// Should get put into jerry
createSchedulingRequest(1024, "jerry", "someuser");
- assertEquals(1, jerryQueue.getAppSchedulables().size());
+ assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
// Should get forced into default
createSchedulingRequest(1024, "newqueue", "someuser");
- assertEquals(1, jerryQueue.getAppSchedulables().size());
- assertEquals(1, defaultQueue.getAppSchedulables().size());
+ assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
+ assertEquals(1, defaultQueue.getRunnableAppSchedulables().size());
// Would get put into someuser because of user-as-default-queue, but should
// be forced into default
createSchedulingRequest(1024, "default", "someuser");
- assertEquals(1, jerryQueue.getAppSchedulables().size());
- assertEquals(2, defaultQueue.getAppSchedulables().size());
+ assertEquals(1, jerryQueue.getRunnableAppSchedulables().size());
+ assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
// Should get put into jerry because of user-as-default-queue
createSchedulingRequest(1024, "default", "jerry");
- assertEquals(2, jerryQueue.getAppSchedulables().size());
- assertEquals(2, defaultQueue.getAppSchedulables().size());
+ assertEquals(2, jerryQueue.getRunnableAppSchedulables().size());
+ assertEquals(2, defaultQueue.getRunnableAppSchedulables().size());
}
@SuppressWarnings("resource")
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java?rev=1546626&r1=1546625&r2=1546626&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/TestFifoScheduler.java Fri Nov 29 19:14:36 2013
@@ -56,6 +56,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.Task;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
@@ -518,7 +519,7 @@ public class TestFifoScheduler {
public void testConcurrentAccessOnApplications() throws Exception {
FifoScheduler fs = new FifoScheduler();
TestCapacityScheduler.verifyConcurrentAccessOnApplications(
- fs.applications, FiCaSchedulerApp.class);
+ fs.applications, FiCaSchedulerApp.class, Queue.class);
}
@SuppressWarnings("resource")