You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by to...@apache.org on 2012/11/30 13:05:14 UTC
svn commit: r1415593 [1/2] - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-ya...
Author: tomwhite
Date: Fri Nov 30 12:05:13 2012
New Revision: 1415593
URL: http://svn.apache.org/viewvc?rev=1415593&view=rev
Log:
Merge -r 1415591:1415592 from trunk to branch-2. Fixes: YARN-187. Add hierarchical queues to the fair scheduler. Contributed by Sandy Ryza.
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
- copied unchanged from r1415592, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
- copied unchanged from r1415592, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
- copied unchanged from r1415592, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
Removed:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSQueueSchedulable.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Fri Nov 30 12:05:13 2012
@@ -100,6 +100,8 @@ Release 2.0.3-alpha - Unreleased
YARN-229. Remove old unused RM recovery code. (Bikas Saha via acmurthy)
+ YARN-187. Add hierarchical queues to the fair scheduler. (Sandy Ryza via tomwhite)
+
Release 2.0.2-alpha - 2012-09-07
YARN-9. Rename YARN_HOME to HADOOP_YARN_HOME. (vinodkv via acmurthy)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AppSchedulable.java Fri Nov 30 12:05:13 2012
@@ -50,10 +50,10 @@ public class AppSchedulable extends Sche
private long startTime;
private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
private static final Log LOG = LogFactory.getLog(AppSchedulable.class);
- private FSQueue queue;
+ private FSLeafQueue queue;
private RMContainerTokenSecretManager containerTokenSecretManager;
- public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSQueue queue) {
+ public AppSchedulable(FairScheduler scheduler, FSSchedulerApp app, FSLeafQueue queue) {
this.scheduler = scheduler;
this.app = app;
this.startTime = System.currentTimeMillis();
@@ -97,9 +97,6 @@ public class AppSchedulable extends Sche
}
@Override
- public void redistributeShare() {}
-
- @Override
public Resource getResourceUsage() {
return app.getCurrentConsumption();
}
@@ -114,7 +111,7 @@ public class AppSchedulable extends Sche
* Get metrics reference from containing queue.
*/
public QueueMetrics getMetrics() {
- return queue.getQueueSchedulable().getMetrics();
+ return queue.getMetrics();
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Fri Nov 30 12:05:13 2012
@@ -20,65 +20,112 @@ package org.apache.hadoop.yarn.server.re
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
-
-/**
- * A queue containing several applications.
- */
-@Private
-@Unstable
-public class FSQueue {
- /** Queue name. */
- private String name;
-
- /** Applications in this specific queue; does not include children queues' jobs. */
- private Collection<FSSchedulerApp> applications =
- new ArrayList<FSSchedulerApp>();
-
- /** Scheduling mode for jobs inside the queue (fair or FIFO) */
- private SchedulingMode schedulingMode;
-
- private FairScheduler scheduler;
-
- private FSQueueSchedulable queueSchedulable;
-
- public FSQueue(FairScheduler scheduler, String name) {
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.QueueInfo;
+import org.apache.hadoop.yarn.api.records.QueueState;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
+
+public abstract class FSQueue extends Schedulable implements Queue {
+ private final String name;
+ private final QueueManager queueMgr;
+ private final FairScheduler scheduler;
+ private final QueueMetrics metrics;
+
+ protected final FSParentQueue parent;
+ protected final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
+
+ public FSQueue(String name, QueueManager queueMgr,
+ FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
- this.queueSchedulable = new FSQueueSchedulable(scheduler, this);
+ this.queueMgr = queueMgr;
this.scheduler = scheduler;
+ this.metrics = QueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
+ this.parent = parent;
}
-
- public Collection<FSSchedulerApp> getApplications() {
- return applications;
- }
-
- public void addApp(FSSchedulerApp app) {
- applications.add(app);
- AppSchedulable appSchedulable = new AppSchedulable(scheduler, app, this);
- app.setAppSchedulable(appSchedulable);
- queueSchedulable.addApp(appSchedulable);
- }
-
- public void removeApp(FSSchedulerApp app) {
- applications.remove(app);
- queueSchedulable.removeApp(app);
- }
-
+
public String getName() {
return name;
}
-
- public SchedulingMode getSchedulingMode() {
- return schedulingMode;
- }
-
- public void setSchedulingMode(SchedulingMode schedulingMode) {
- this.schedulingMode = schedulingMode;
- }
-
- public FSQueueSchedulable getQueueSchedulable() {
- return queueSchedulable;
+
+ @Override
+ public String getQueueName() {
+ return name;
}
+
+ @Override
+ public double getWeight() {
+ return queueMgr.getQueueWeight(getName());
+ }
+
+ @Override
+ public Resource getMinShare() {
+ return queueMgr.getMinResources(getName());
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public Priority getPriority() {
+ Priority p = recordFactory.newRecordInstance(Priority.class);
+ p.setPriority(1);
+ return p;
+ }
+
+ @Override
+ public QueueInfo getQueueInfo(boolean includeChildQueues, boolean recursive) {
+ QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
+ queueInfo.setQueueName(getQueueName());
+ // TODO: we might change these queue metrics around a little bit
+ // to match the semantics of the fair scheduler.
+ queueInfo.setCapacity((float) getFairShare().getMemory() /
+ scheduler.getClusterCapacity().getMemory());
+ queueInfo.setCapacity((float) getResourceUsage().getMemory() /
+ scheduler.getClusterCapacity().getMemory());
+
+ ArrayList<QueueInfo> childQueueInfos = new ArrayList<QueueInfo>();
+ if (includeChildQueues) {
+ Collection<FSQueue> childQueues = getChildQueues();
+ for (FSQueue child : childQueues) {
+ childQueueInfos.add(child.getQueueInfo(recursive, recursive));
+ }
+ }
+ queueInfo.setChildQueues(childQueueInfos);
+ queueInfo.setQueueState(QueueState.RUNNING);
+ return queueInfo;
+ }
+
+ @Override
+ public Map<QueueACL, AccessControlList> getQueueAcls() {
+ Map<QueueACL, AccessControlList> acls = queueMgr.getQueueAcls(getName());
+ return new HashMap<QueueACL, AccessControlList>(acls);
+ }
+
+ @Override
+ public QueueMetrics getMetrics() {
+ return metrics;
+ }
+
+ /**
+ * Recomputes the fair shares for all queues and applications
+ * under this queue.
+ */
+ public abstract void recomputeFairShares();
+
+ /**
+ * Gets the children of this queue, if any.
+ */
+ public abstract Collection<FSQueue> getChildQueues();
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 30 12:05:13 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ *
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion. Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ *
+ * A queue's name starts with the names of its parents, with periods as
+ * separators. So a queue named "queue1" under the root named, would be
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
@@ -105,23 +125,22 @@ public class FairScheduler implements Re
// Aggregate metrics
QueueMetrics rootMetrics;
- //Time when we last updated preemption vars
+ // Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
- //Time we last ran preemptTasksIfNecessary
+ // Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
-
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
- protected Map<ApplicationAttemptId, FSSchedulerApp> applications
- = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+ protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
- private Map<NodeId, FSSchedulerNode> nodes =
+ private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
- private Resource clusterCapacity =
+ private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
@@ -131,10 +150,11 @@ public class FairScheduler implements Re
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
- protected double nodeLocalityThreshold; // Cluster threshold for node locality
- protected double rackLocalityThreshold; // Cluster threshold for rack locality
- private FairSchedulerEventLog eventLog; // Machine-readable event log
- protected boolean assignMultiple; // Allocate multiple containers per heartbeat
+ protected double nodeLocalityThreshold; // Cluster threshold for node locality
+ protected double rackLocalityThreshold; // Cluster threshold for rack locality
+ private FairSchedulerEventLog eventLog; // Machine-readable event log
+ protected boolean assignMultiple; // Allocate multiple containers per
+ // heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
public FairScheduler() {
@@ -150,16 +170,8 @@ public class FairScheduler implements Re
return queueMgr;
}
- public List<FSQueueSchedulable> getQueueSchedulables() {
- List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
- for (FSQueue queue: queueMgr.getQueues()) {
- scheds.add(queue.getQueueSchedulable());
- }
- return scheds;
- }
-
private RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp application =
+ FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -183,34 +195,24 @@ public class FairScheduler implements Re
}
/**
- * Recompute the internal variables used by the scheduler - per-job weights,
- * fair shares, deficits, minimum slot allocations, and amount of used and
- * required resources per job.
- */
+ * Recompute the internal variables used by the scheduler - per-job weights,
+ * fair shares, deficits, minimum slot allocations, and amount of used and
+ * required resources per job.
+ */
protected synchronized void update() {
queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updateRunnability(); // Set job runnability based on user/queue limits
updatePreemptionVariables(); // Determine if any queues merit preemption
- // Update demands of apps and queues
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().updateDemand();
- }
+ FSQueue rootQueue = queueMgr.getRootQueue();
- // Compute fair shares based on updated demands
- List<FSQueueSchedulable> queueScheds = getQueueSchedulables();
- SchedulingAlgorithms.computeFairShares(
- queueScheds, clusterCapacity);
+ // Recursively update demands for all queues
+ rootQueue.updateDemand();
- // Update queue metrics for this queue
- for (FSQueueSchedulable sched : queueScheds) {
- sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
- }
-
- // Use the computed shares to assign shares within each queue
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().redistributeShare();
- }
+ rootQueue.setFairShare(clusterCapacity);
+ // Recursively compute fair shares for all queues
+ // and update metrics
+ rootQueue.recomputeFairShares();
// Update recorded capacity of root queue (child queues are updated
// when fair share is calculated).
@@ -225,7 +227,7 @@ public class FairScheduler implements Re
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
@@ -238,16 +240,16 @@ public class FairScheduler implements Re
/**
* Is a queue below its min share for the given task type?
*/
- boolean isStarvedForMinShare(FSQueueSchedulable sched) {
+ boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
}
/**
- * Is a queue being starved for fair share for the given task type?
- * This is defined as being below half its fair share.
+ * Is a queue being starved for fair share for the given task type? This is
+ * defined as being below half its fair share.
*/
- boolean isStarvedForFairShare(FSQueueSchedulable sched) {
+ boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@@ -255,10 +257,10 @@ public class FairScheduler implements Re
/**
* Check for queues that need tasks preempted, either because they have been
- * below their guaranteed share for minSharePreemptionTimeout or they
- * have been below half their fair share for the fairSharePreemptionTimeout.
- * If such queues exist, compute how many tasks of each type need to be
- * preempted and then select the right ones using preemptTasks.
+ * below their guaranteed share for minSharePreemptionTimeout or they have
+ * been below half their fair share for the fairSharePreemptionTimeout. If
+ * such queues exist, compute how many tasks of each type need to be preempted
+ * and then select the right ones using preemptTasks.
*/
protected synchronized void preemptTasksIfNecessary() {
if (!preemptionEnabled) {
@@ -273,35 +275,37 @@ public class FairScheduler implements Re
Resource resToPreempt = Resources.none();
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(resToPreempt, Resources.none())) {
- preemptResources(getQueueSchedulables(), resToPreempt);
+ preemptResources(queueMgr.getLeafQueues(), resToPreempt);
}
}
/**
- * Preempt a quantity of resources from a list of QueueSchedulables.
- * The policy for this is to pick apps from queues that are over their fair
- * share, but make sure that no queue is placed below its fair share in the
- * process. We further prioritize preemption by choosing containers with
- * lowest priority to preempt.
+ * Preempt a quantity of resources from a list of QueueSchedulables. The
+ * policy for this is to pick apps from queues that are over their fair share,
+ * but make sure that no queue is placed below its fair share in the process.
+ * We further prioritize preemption by choosing containers with lowest
+ * priority to preempt.
*/
- protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
+ protected void preemptResources(Collection<FSLeafQueue> scheds,
+ Resource toPreempt) {
if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return;
}
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
- Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
+ Map<RMContainer, FSLeafQueue> queues =
+ new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
- for (FSQueueSchedulable sched: scheds) {
+ for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as: sched.getAppSchedulables()) {
+ for (AppSchedulable as : sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@@ -321,12 +325,12 @@ public class FairScheduler implements Re
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
- for (RMContainer container: runningContainers) {
- FSQueueSchedulable sched = queues.get(container);
+ for (RMContainer container : runningContainers) {
+ FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
- ") from queue " + sched.getQueue().getName());
+ ") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
@@ -348,12 +352,12 @@ public class FairScheduler implements Re
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
- * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
- * its full fair share. If both conditions hold, we preempt the max of the
- * two amounts (this shouldn't happen unless someone sets the timeouts to
- * be identical for some reason).
+ * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+ * full fair share. If both conditions hold, we preempt the max of the two
+ * amounts (this shouldn't happen unless someone sets the timeouts to be
+ * identical for some reason).
*/
- protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
+ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@@ -362,7 +366,7 @@ public class FairScheduler implements Re
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(),
- Resources.subtract(target, sched.getResourceUsage()));
+ Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@@ -380,15 +384,15 @@ public class FairScheduler implements Re
}
/**
- * This updates the runnability of all apps based on whether or not
- * any users/queues have exceeded their capacity.
+ * This updates the runnability of all apps based on whether or not any
+ * users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
- for (FSQueue p: queueMgr.getQueues()) {
- for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
+ for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
+ for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
@@ -400,7 +404,7 @@ public class FairScheduler implements Re
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
- for (AppSchedulable app: apps) {
+ for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@@ -473,22 +477,25 @@ public class FairScheduler implements Re
}
/**
- * Add a new application to the scheduler, with a given id, queue name,
- * and user. This will accept a new app even if the user or queue is above
+ * Add a new application to the scheduler, with a given id, queue name, and
+ * user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void
- addApplication(ApplicationAttemptId applicationAttemptId,
- String queueName, String user) {
+ protected synchronized void addApplication(
+ ApplicationAttemptId applicationAttemptId, String queueName, String user) {
- FSQueue queue = queueMgr.getQueue(queueName);
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+ if (queue == null) {
+ // queue is not an existing or createable leaf queue
+ queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
- queue.getQueueSchedulable(), new ActiveUsersManager(getRootQueueMetrics()),
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
rmContext);
-
- // Inforce ACLs
+
+ // Enforce ACLs
UserGroupInformation userUgi;
try {
userUgi = UserGroupInformation.getCurrentUser();
@@ -497,8 +504,8 @@ public class FairScheduler implements Re
return;
}
- List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
- userUgi); // Always a signleton list
+ // Always a singleton list
+ List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName());
@@ -506,14 +513,13 @@ public class FairScheduler implements Re
}
queue.addApp(schedulerApp);
- queue.getQueueSchedulable().getMetrics().submitApp(user,
- applicationAttemptId.getAttemptId());
+ queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
- ", user: " + user +
+ ", user: "+ user +
", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
@@ -540,10 +546,10 @@ public class FairScheduler implements Re
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
- // Release all reserved containers
+ // Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -556,7 +562,8 @@ public class FairScheduler implements Re
application.stop(rmAppAttemptFinalState);
// Inform the queue
- FSQueue queue = queueMgr.getQueue(application.getQueue().getQueueName());
+ FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+ .getQueueName());
queue.removeApp(application);
// Remove from our data-structure
@@ -658,11 +665,11 @@ public class FairScheduler implements Re
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FairScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
+ RMAuditLogger.logFailure(application.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "FairScheduler",
+ "Trying to release container not owned by app or with invalid id",
+ application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -675,8 +682,8 @@ public class FairScheduler implements Re
if (!ask.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
- " applicationAttemptId=" + appAttemptId +
- " application=" + application.getApplicationId());
+ " applicationAttemptId=" + appAttemptId +
+ " application=" + application.getApplicationId());
}
application.showRequests();
@@ -689,19 +696,17 @@ public class FairScheduler implements Re
if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
- " applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size());
+ " applicationAttemptId=" + appAttemptId +
+ " #ask=" + ask.size());
}
- return new Allocation(
- application.pullNewlyAllocatedContainers(),
+ return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
/**
- * Process a container which has launched on a node, as reported by the
- * node.
+ * Process a container which has launched on a node, as reported by the node.
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
@@ -757,20 +762,20 @@ public class FairScheduler implements Re
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
- FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
- queue.getQueueSchedulable().assignContainer(node, true);
+ FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
+ queue.assignContainer(node, true);
}
-
// Otherwise, schedule at queue which is furthest below fair share
else {
int assignedContainers = 0;
while (true) {
// At most one task is scheduled each iteration of this loop
- List<FSQueueSchedulable> scheds = getQueueSchedulables();
+ List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
+ queueMgr.getLeafQueues());
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false;
- for (FSQueueSchedulable sched : scheds) {
+ for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
@@ -813,7 +818,7 @@ public class FairScheduler implements Re
@Override
public void handle(SchedulerEvent event) {
- switch(event.getType()) {
+ switch (event.getType()) {
case NODE_ADDED:
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
@@ -832,8 +837,7 @@ public class FairScheduler implements Re
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- NodeUpdateSchedulerEvent nodeUpdatedEvent =
- (NodeUpdateSchedulerEvent)event;
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
@@ -842,7 +846,7 @@ public class FairScheduler implements Re
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so
@@ -867,7 +871,7 @@ public class FairScheduler implements Re
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerExpiredSchedulerEvent containerExpiredEvent =
- (ContainerExpiredSchedulerEvent) event;
+ (ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
@@ -886,8 +890,8 @@ public class FairScheduler implements Re
}
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
@@ -909,11 +913,10 @@ public class FairScheduler implements Re
try {
queueMgr.initialize();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
-
+
Thread updateThread = new Thread(new UpdateThread());
updateThread.setName("FairSchedulerUpdateThread");
updateThread.setDaemon(true);
@@ -925,10 +928,9 @@ public class FairScheduler implements Re
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try {
- queueMgr.reloadAllocs();
+ queueMgr.reloadAllocs();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
@@ -940,8 +942,8 @@ public class FairScheduler implements Re
if (!queueMgr.exists(queueName)) {
return null;
}
- return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
- includeChildQueues, recursive);
+ return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+ recursive);
}
@Override
@@ -953,12 +955,7 @@ public class FairScheduler implements Re
return new ArrayList<QueueUserACLInfo>();
}
- List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-
- for (FSQueue queue : queueMgr.getQueues()) {
- userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
- }
- return userAcls;
+ return queueMgr.getRootQueue().getQueueUserAclInfo(user);
}
@Override
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 30 12:05:13 2012
@@ -27,6 +27,7 @@ import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -52,6 +53,7 @@ import org.xml.sax.SAXException;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
+ *
*/
@Private
@Unstable
@@ -59,6 +61,8 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
+ public static final String ROOT_QUEUE = "root";
+
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
@@ -76,7 +80,10 @@ public class QueueManager {
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
- private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private final Collection<FSLeafQueue> leafQueues =
+ new CopyOnWriteArrayList<FSLeafQueue>();
+ private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
@@ -87,10 +94,17 @@ public class QueueManager {
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
+
+ public FSParentQueue getRootQueue() {
+ return rootQueue;
+ }
public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf();
+ rootQueue = new FSParentQueue("root", this, scheduler, null);
+ queues.put(rootQueue.getName(), rootQueue);
+
this.allocFile = conf.getAllocationFile();
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
@@ -106,21 +120,106 @@ public class QueueManager {
lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
- getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
-
+
/**
- * Get a queue by name, creating it if necessary
- */
- public FSQueue getQueue(String name) {
+ * Get a queue by name, creating it if necessary. If the queue
+ * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
+ * or one of the parents in its name is already a leaf queue, null is returned.
+ *
+ * The root part of the name is optional, so a queue underneath the root
+ * named "queue1" could be referred to as just "queue1", and a queue named
+ * "queue2" underneath a parent named "parent1" that is underneath the root
+ * could be referred to as just "parent1.queue2".
+ */
+ public FSLeafQueue getLeafQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".")) {
+ name = ROOT_QUEUE + "." + name;
+ }
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null) {
- queue = new FSQueue(scheduler, name);
- queue.setSchedulingMode(info.defaultSchedulingMode);
- queues.put(name, queue);
+ FSLeafQueue leafQueue = createLeafQueue(name);
+ if (leafQueue == null) {
+ return null;
+ }
+ leafQueue.setSchedulingMode(info.defaultSchedulingMode);
+ queue = leafQueue;
+ } else if (queue instanceof FSParentQueue) {
+ return null;
+ }
+ return (FSLeafQueue)queue;
+ }
+ }
+
+ /**
+ * Creates a leaf queue and places it in the tree. Creates any
+ * parents that don't already exist.
+ *
+ * @return
+ * the created queue, if successful. null if not allowed (one of the parent
+ * queues in the queue name is already a leaf queue)
+ */
+ private FSLeafQueue createLeafQueue(String name) {
+ List<String> newQueueNames = new ArrayList<String>();
+ newQueueNames.add(name);
+ int sepIndex = name.length();
+ FSParentQueue parent = null;
+
+ // Move up the queue tree until we reach one that exists.
+ while (sepIndex != -1) {
+ sepIndex = name.lastIndexOf('.', sepIndex-1);
+ FSQueue queue;
+ String curName = null;
+ curName = name.substring(0, sepIndex);
+ queue = queues.get(curName);
+
+ if (queue == null) {
+ newQueueNames.add(curName);
+ } else {
+ if (queue instanceof FSParentQueue) {
+ parent = (FSParentQueue)queue;
+ break;
+ } else {
+ return null;
+ }
}
- return queue;
+ }
+
+ // At this point, parent refers to the deepest existing parent of the
+ // queue to create.
+ // Now that we know everything worked out, make all the queues
+ // and add them to the map.
+ FSLeafQueue leafQueue = null;
+ for (int i = newQueueNames.size()-1; i >= 0; i--) {
+ String queueName = newQueueNames.get(i);
+ if (i == 0) {
+ // First name added was the leaf queue
+ leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+ parent.addChildQueue(leafQueue);
+ queues.put(leafQueue.getName(), leafQueue);
+ leafQueues.add(leafQueue);
+ } else {
+ FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+ parent.addChildQueue(newParent);
+ queues.put(newParent.getName(), newParent);
+ parent = newParent;
+ }
+ }
+
+ return leafQueue;
+ }
+
+ /**
+ * Gets a queue by name.
+ */
+ public FSQueue getQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+ name = ROOT_QUEUE + "." + name;
+ }
+ synchronized (queues) {
+ return queues.get(name);
}
}
@@ -136,8 +235,8 @@ public class QueueManager {
/**
* Get the queue for a given AppSchedulable.
*/
- public FSQueue getQueueForApp(AppSchedulable app) {
- return getQueue(app.getApp().getQueueName());
+ public FSLeafQueue getQueueForApp(AppSchedulable app) {
+ return getLeafQueue(app.getApp().getQueueName());
}
/**
@@ -237,54 +336,9 @@ public class QueueManager {
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
- String queueName = element.getAttribute("name");
- Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
- queueNamesInAllocFile.add(queueName);
- NodeList fields = element.getChildNodes();
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element) fieldNode;
- if ("minResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- minQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- maxQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxRunningApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- queueMaxApps.put(queueName, val);
- } else if ("weight".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- double val = Double.parseDouble(text);
- queueWeights.put(queueName, val);
- } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- minSharePreemptionTimeouts.put(queueName, val);
- } else if ("schedulingMode".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- queueModes.put(queueName, parseSchedulingMode(text));
- } else if ("aclSubmitApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
- } else if ("aclAdministerApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
- }
- }
- queueAcls.put(queueName, acls);
- if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
- && Resources.lessThan(maxQueueResources.get(queueName),
- minQueueResources.get(queueName))) {
- LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
- queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
- }
+ loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@@ -331,7 +385,7 @@ public class QueueManager {
queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
- FSQueue queue = getQueue(name);
+ FSLeafQueue queue = getLeafQueue(name);
if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name));
} else {
@@ -340,6 +394,75 @@ public class QueueManager {
}
}
}
+
+ /**
+ * Loads a queue from a queue element in the configuration file
+ */
+ private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+ Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+ Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
+ throws AllocationConfigurationException {
+ String queueName = parentName + "." + element.getAttribute("name");
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ NodeList fields = element.getChildNodes();
+ boolean isLeaf = true;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ minQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ maxQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, val);
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ queueModes.put(queueName, parseSchedulingMode(text));
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ } else if ("queue".endsWith(field.getTagName()) ||
+ "pool".equals(field.getTagName())) {
+ loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
+ isLeaf = false;
+ }
+ }
+ if (isLeaf) {
+ queueNamesInAllocFile.add(queueName);
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+ && Resources.lessThan(maxQueueResources.get(queueName),
+ minQueueResources.get(queueName))) {
+ LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+ queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+ }
+ }
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
@@ -384,9 +507,9 @@ public class QueueManager {
/**
* Get a collection of all queues
*/
- public Collection<FSQueue> getQueues() {
+ public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
- return new ArrayList<FSQueue>(queues.values());
+ return leafQueues;
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Nov 30 12:05:13 2012
@@ -92,12 +92,6 @@ abstract class Schedulable {
public abstract void updateDemand();
/**
- * Distribute the fair share assigned to this Schedulable among its
- * children (used in queues where the internal scheduler is fair sharing).
- */
- public abstract void redistributeShare();
-
- /**
* Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation
* already exists on this node, and the schedulable should fulfill that
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Fri Nov 30 12:05:13 2012
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
public class FairSchedulerInfo {
@@ -32,9 +32,9 @@ public class FairSchedulerInfo {
public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs;
- Collection<FSQueue> queues = fs.getQueueManager().getQueues();
+ Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
queueInfos = new ArrayList<FairSchedulerQueueInfo>();
- for (FSQueue queue : queues) {
+ for (FSLeafQueue queue : queues) {
queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java Fri Nov 30 12:05:13 2012
@@ -22,9 +22,8 @@ import java.util.Collection;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueSchedulable;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AppSchedulable;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueueManager;
@@ -49,17 +48,16 @@ public class FairSchedulerQueueInfo {
private String queueName;
- public FairSchedulerQueueInfo(FSQueue queue, FairScheduler scheduler) {
- Collection<FSSchedulerApp> apps = queue.getApplications();
- for (FSSchedulerApp app : apps) {
- if (app.isPending()) {
+ public FairSchedulerQueueInfo(FSLeafQueue queue, FairScheduler scheduler) {
+ Collection<AppSchedulable> apps = queue.getAppSchedulables();
+ for (AppSchedulable app : apps) {
+ if (app.getApp().isPending()) {
numPendingApps++;
} else {
numActiveApps++;
}
}
- FSQueueSchedulable schedulable = queue.getQueueSchedulable();
QueueManager manager = scheduler.getQueueManager();
queueName = queue.getName();
@@ -67,11 +65,11 @@ public class FairSchedulerQueueInfo {
Resource clusterMax = scheduler.getClusterCapacity();
clusterMaxMem = clusterMax.getMemory();
- usedResources = schedulable.getResourceUsage();
+ usedResources = queue.getResourceUsage();
fractionUsed = (float)usedResources.getMemory() / clusterMaxMem;
- fairShare = schedulable.getFairShare().getMemory();
- minResources = schedulable.getMinShare();
+ fairShare = queue.getFairShare().getMemory();
+ minResources = queue.getMinShare();
minShare = minResources.getMemory();
maxResources = scheduler.getQueueManager().getMaxResources(queueName);
if (maxResources.getMemory() > clusterMaxMem) {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java?rev=1415593&r1=1415592&r2=1415593&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FakeSchedulable.java Fri Nov 30 12:05:13 2012
@@ -108,8 +108,5 @@ public class FakeSchedulable extends Sch
}
@Override
- public void redistributeShare() {}
-
- @Override
public void updateDemand() {}
}