You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by su...@apache.org on 2012/11/30 20:58:44 UTC
svn commit: r1415815 [3/5] - in
/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project: ./
hadoop-yarn/dev-support/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/
hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-a...
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 30 19:58:09 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -50,7 +51,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ *
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion. Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ *
+ * A queue's name starts with the names of its parents, with periods as
+ * separators. So a queue named "queue1" under the root named, would be
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
@LimitedPrivate("yarn")
@Unstable
@SuppressWarnings("unchecked")
@@ -94,7 +114,7 @@ public class FairScheduler implements Re
protected long UPDATE_INTERVAL = 500;
// Whether to use username in place of "default" queue name
- private boolean userAsDefaultQueue = false;
+ private volatile boolean userAsDefaultQueue = false;
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -105,23 +125,22 @@ public class FairScheduler implements Re
// Aggregate metrics
QueueMetrics rootMetrics;
- //Time when we last updated preemption vars
+ // Time when we last updated preemption vars
protected long lastPreemptionUpdateTime;
- //Time we last ran preemptTasksIfNecessary
+ // Time we last ran preemptTasksIfNecessary
private long lastPreemptCheckTime;
-
// This stores per-application scheduling information, indexed by
// attempt ID's for fast lookup.
- protected Map<ApplicationAttemptId, FSSchedulerApp> applications
- = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+ protected Map<ApplicationAttemptId, FSSchedulerApp> applications =
+ new HashMap<ApplicationAttemptId, FSSchedulerApp>();
// Nodes in the cluster, indexed by NodeId
- private Map<NodeId, FSSchedulerNode> nodes =
+ private Map<NodeId, FSSchedulerNode> nodes =
new ConcurrentHashMap<NodeId, FSSchedulerNode>();
// Aggregate capacity of the cluster
- private Resource clusterCapacity =
+ private Resource clusterCapacity =
RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
// How often tasks are preempted (must be longer than a couple
@@ -131,31 +150,28 @@ public class FairScheduler implements Re
protected boolean preemptionEnabled;
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
- protected double nodeLocalityThreshold; // Cluster threshold for node locality
- protected double rackLocalityThreshold; // Cluster threshold for rack locality
- private FairSchedulerEventLog eventLog; // Machine-readable event log
- protected boolean assignMultiple; // Allocate multiple containers per heartbeat
+ protected double nodeLocalityThreshold; // Cluster threshold for node locality
+ protected double rackLocalityThreshold; // Cluster threshold for rack locality
+ private FairSchedulerEventLog eventLog; // Machine-readable event log
+ protected boolean assignMultiple; // Allocate multiple containers per
+ // heartbeat
protected int maxAssign; // Max containers to assign per heartbeat
-
+
+ public FairScheduler() {
+ clock = new SystemClock();
+ queueMgr = new QueueManager(this);
+ }
public FairSchedulerConfiguration getConf() {
- return this.conf;
+ return conf;
}
public QueueManager getQueueManager() {
- return this.queueMgr;
- }
-
- public List<FSQueueSchedulable> getQueueSchedulables() {
- List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
- for (FSQueue queue: queueMgr.getQueues()) {
- scheds.add(queue.getQueueSchedulable());
- }
- return scheds;
+ return queueMgr;
}
private RMContainer getRMContainer(ContainerId containerId) {
- FSSchedulerApp application =
+ FSSchedulerApp application =
applications.get(containerId.getApplicationAttemptId());
return (application == null) ? null : application.getRMContainer(containerId);
}
@@ -166,7 +182,7 @@ public class FairScheduler implements Re
*/
private class UpdateThread implements Runnable {
public void run() {
- while (initialized) {
+ while (true) {
try {
Thread.sleep(UPDATE_INTERVAL);
update();
@@ -179,40 +195,28 @@ public class FairScheduler implements Re
}
/**
- * Recompute the internal variables used by the scheduler - per-job weights,
- * fair shares, deficits, minimum slot allocations, and amount of used and
- * required resources per job.
- */
- protected void update() {
- synchronized (this) {
- queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
- updateRunnability(); // Set job runnability based on user/queue limits
- updatePreemptionVariables(); // Determine if any queues merit preemption
-
- // Update demands of apps and queues
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().updateDemand();
- }
-
- // Compute fair shares based on updated demands
- List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables();
- SchedulingAlgorithms.computeFairShares(
- queueScheds, clusterCapacity);
-
- // Update queue metrics for this queue
- for (FSQueueSchedulable sched : queueScheds) {
- sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
- }
-
- // Use the computed shares to assign shares within each queue
- for (FSQueue queue: queueMgr.getQueues()) {
- queue.getQueueSchedulable().redistributeShare();
- }
-
- // Update recorded capacity of root queue (child queues are updated
- // when fair share is calculated).
- rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
- }
+ * Recompute the internal variables used by the scheduler - per-job weights,
+ * fair shares, deficits, minimum slot allocations, and amount of used and
+ * required resources per job.
+ */
+ protected synchronized void update() {
+ queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
+ updateRunnability(); // Set job runnability based on user/queue limits
+ updatePreemptionVariables(); // Determine if any queues merit preemption
+
+ FSQueue rootQueue = queueMgr.getRootQueue();
+
+ // Recursively update demands for all queues
+ rootQueue.updateDemand();
+
+ rootQueue.setFairShare(clusterCapacity);
+ // Recursively compute fair shares for all queues
+ // and update metrics
+ rootQueue.recomputeFairShares();
+
+ // Update recorded capacity of root queue (child queues are updated
+ // when fair share is calculated).
+ rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
}
/**
@@ -223,7 +227,7 @@ public class FairScheduler implements Re
private void updatePreemptionVariables() {
long now = clock.getTime();
lastPreemptionUpdateTime = now;
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
if (!isStarvedForMinShare(sched)) {
sched.setLastTimeAtMinShare(now);
}
@@ -236,16 +240,16 @@ public class FairScheduler implements Re
/**
* Is a queue below its min share for the given task type?
*/
- boolean isStarvedForMinShare(FSQueueSchedulable sched) {
+ boolean isStarvedForMinShare(FSLeafQueue sched) {
Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredShare);
}
/**
- * Is a queue being starved for fair share for the given task type?
- * This is defined as being below half its fair share.
+ * Is a queue being starved for fair share for the given task type? This is
+ * defined as being below half its fair share.
*/
- boolean isStarvedForFairShare(FSQueueSchedulable sched) {
+ boolean isStarvedForFairShare(FSLeafQueue sched) {
Resource desiredFairShare = Resources.max(
Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@@ -253,53 +257,55 @@ public class FairScheduler implements Re
/**
* Check for queues that need tasks preempted, either because they have been
- * below their guaranteed share for minSharePreemptionTimeout or they
- * have been below half their fair share for the fairSharePreemptionTimeout.
- * If such queues exist, compute how many tasks of each type need to be
- * preempted and then select the right ones using preemptTasks.
- *
- * This method computes and logs the number of tasks we want to preempt even
- * if preemption is disabled, for debugging purposes.
+ * below their guaranteed share for minSharePreemptionTimeout or they have
+ * been below half their fair share for the fairSharePreemptionTimeout. If
+ * such queues exist, compute how many tasks of each type need to be preempted
+ * and then select the right ones using preemptTasks.
*/
- protected void preemptTasksIfNecessary() {
- if (!preemptionEnabled)
+ protected synchronized void preemptTasksIfNecessary() {
+ if (!preemptionEnabled) {
return;
+ }
long curTime = clock.getTime();
- if (curTime - lastPreemptCheckTime < preemptionInterval)
+ if (curTime - lastPreemptCheckTime < preemptionInterval) {
return;
+ }
lastPreemptCheckTime = curTime;
Resource resToPreempt = Resources.none();
- for (FSQueueSchedulable sched: getQueueSchedulables()) {
+ for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
}
if (Resources.greaterThan(resToPreempt, Resources.none())) {
- preemptResources(getQueueSchedulables(), resToPreempt);
+ preemptResources(queueMgr.getLeafQueues(), resToPreempt);
}
}
/**
- * Preempt a quantity of resources from a list of QueueSchedulables.
- * The policy for this is to pick apps from queues that are over their fair
- * share, but make sure that no queue is placed below its fair share in the
- * process. We further prioritize preemption by choosing containers with
- * lowest priority to preempt.
+ * Preempt a quantity of resources from a list of QueueSchedulables. The
+ * policy for this is to pick apps from queues that are over their fair share,
+ * but make sure that no queue is placed below its fair share in the process.
+ * We further prioritize preemption by choosing containers with lowest
+ * priority to preempt.
*/
- protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
- if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
+ protected void preemptResources(Collection<FSLeafQueue> scheds,
+ Resource toPreempt) {
+ if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
return;
+ }
Map<RMContainer, FSSchedulerApp> apps =
new HashMap<RMContainer, FSSchedulerApp>();
- Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
+ Map<RMContainer, FSLeafQueue> queues =
+ new HashMap<RMContainer, FSLeafQueue>();
// Collect running containers from over-scheduled queues
List<RMContainer> runningContainers = new ArrayList<RMContainer>();
- for (FSQueueSchedulable sched: scheds) {
+ for (FSLeafQueue sched : scheds) {
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
- for (AppSchedulable as: sched.getAppSchedulables()) {
+ for (AppSchedulable as : sched.getAppSchedulables()) {
for (RMContainer c : as.getApp().getLiveContainers()) {
runningContainers.add(c);
apps.put(c, as.getApp());
@@ -319,18 +325,18 @@ public class FairScheduler implements Re
// Scan down the sorted list of task statuses until we've killed enough
// tasks, making sure we don't kill too many from any queue
- for (RMContainer container: runningContainers) {
- FSQueueSchedulable sched = queues.get(container);
+ for (RMContainer container : runningContainers) {
+ FSLeafQueue sched = queues.get(container);
if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
"res=" + container.getContainer().getResource() +
- ") from queue " + sched.getQueue().getName());
+ ") from queue " + sched.getName());
ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
// TODO: Not sure if this ever actually adds this to the list of cleanup
// containers on the RMNode (see SchedulerNode.releaseContainer()).
- this.completedContainer(container, status, RMContainerEventType.KILL);
+ completedContainer(container, status, RMContainerEventType.KILL);
toPreempt = Resources.subtract(toPreempt,
container.getContainer().getResource());
@@ -346,12 +352,12 @@ public class FairScheduler implements Re
* If the queue has been below its min share for at least its preemption
* timeout, it should preempt the difference between its current share and
* this min share. If it has been below half its fair share for at least the
- * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
- * its full fair share. If both conditions hold, we preempt the max of the
- * two amounts (this shouldn't happen unless someone sets the timeouts to
- * be identical for some reason).
+ * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+ * full fair share. If both conditions hold, we preempt the max of the two
+ * amounts (this shouldn't happen unless someone sets the timeouts to be
+ * identical for some reason).
*/
- protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
+ protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@@ -360,7 +366,7 @@ public class FairScheduler implements Re
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
resDueToMinShare = Resources.max(Resources.none(),
- Resources.subtract(target, sched.getResourceUsage()));
+ Resources.subtract(target, sched.getResourceUsage()));
}
if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@@ -378,15 +384,15 @@ public class FairScheduler implements Re
}
/**
- * This updates the runnability of all apps based on whether or not
- * any users/queues have exceeded their capacity.
+ * This updates the runnability of all apps based on whether or not any
+ * users/queues have exceeded their capacity.
*/
private void updateRunnability() {
List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
// Start by marking everything as not runnable
- for (FSQueue p: queueMgr.getQueues()) {
- for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
+ for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
+ for (AppSchedulable a : leafQueue.getAppSchedulables()) {
a.setRunnable(false);
apps.add(a);
}
@@ -398,7 +404,7 @@ public class FairScheduler implements Re
Map<String, Integer> userApps = new HashMap<String, Integer>();
Map<String, Integer> queueApps = new HashMap<String, Integer>();
- for (AppSchedulable app: apps) {
+ for (AppSchedulable app : apps) {
String user = app.getApp().getUser();
String queue = app.getApp().getQueueName();
int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@@ -413,10 +419,11 @@ public class FairScheduler implements Re
}
public RMContainerTokenSecretManager getContainerTokenSecretManager() {
- return this.rmContext.getContainerTokenSecretManager();
+ return rmContext.getContainerTokenSecretManager();
}
- public double getAppWeight(AppSchedulable app) {
+ // synchronized for sizeBasedWeight
+ public synchronized double getAppWeight(AppSchedulable app) {
if (!app.getRunnable()) {
// Job won't launch tasks, but don't return 0 to avoid division errors
return 1.0;
@@ -437,28 +444,28 @@ public class FairScheduler implements Re
@Override
public Resource getMinimumResourceCapability() {
- return this.minimumAllocation;
+ return minimumAllocation;
}
@Override
public Resource getMaximumResourceCapability() {
- return this.maximumAllocation;
+ return maximumAllocation;
}
public double getNodeLocalityThreshold() {
- return this.nodeLocalityThreshold;
+ return nodeLocalityThreshold;
}
public double getRackLocalityThreshold() {
- return this.rackLocalityThreshold;
+ return rackLocalityThreshold;
}
public Resource getClusterCapacity() {
- return this.clusterCapacity;
+ return clusterCapacity;
}
public Clock getClock() {
- return this.clock;
+ return clock;
}
protected void setClock(Clock clock) {
@@ -470,22 +477,25 @@ public class FairScheduler implements Re
}
/**
- * Add a new application to the scheduler, with a given id, queue name,
- * and user. This will accept a new app even if the user or queue is above
+ * Add a new application to the scheduler, with a given id, queue name, and
+ * user. This will accept a new app even if the user or queue is above
* configured limits, but the app will not be marked as runnable.
*/
- protected synchronized void
- addApplication(ApplicationAttemptId applicationAttemptId,
- String queueName, String user) {
+ protected synchronized void addApplication(
+ ApplicationAttemptId applicationAttemptId, String queueName, String user) {
- FSQueue queue = this.queueMgr.getQueue(queueName);
+ FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+ if (queue == null) {
+ // queue is not an existing or createable leaf queue
+ queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
- queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()),
- rmContext, null);
-
- // Inforce ACLs
+ queue, new ActiveUsersManager(getRootQueueMetrics()),
+ rmContext);
+
+ // Enforce ACLs
UserGroupInformation userUgi;
try {
userUgi = UserGroupInformation.getCurrentUser();
@@ -494,8 +504,8 @@ public class FairScheduler implements Re
return;
}
- List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
- userUgi); // Always a signleton list
+ // Always a singleton list
+ List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
LOG.info("User " + userUgi.getUserName() +
" cannot submit" + " applications to queue " + queue.getName());
@@ -503,14 +513,13 @@ public class FairScheduler implements Re
}
queue.addApp(schedulerApp);
- queue.getQueueSchedulable().getMetrics().submitApp(user,
- applicationAttemptId.getAttemptId());
+ queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
applications.put(applicationAttemptId, schedulerApp);
LOG.info("Application Submission: " + applicationAttemptId +
- ", user: " + user +
+ ", user: "+ user +
", currently active: " + applications.size());
rmContext.getDispatcher().getEventHandler().handle(
@@ -537,10 +546,10 @@ public class FairScheduler implements Re
SchedulerUtils.createAbnormalContainerStatus(
rmContainer.getContainerId(),
SchedulerUtils.COMPLETED_APPLICATION),
- RMContainerEventType.KILL);
+ RMContainerEventType.KILL);
}
- // Release all reserved containers
+ // Release all reserved containers
for (RMContainer rmContainer : application.getReservedContainers()) {
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -553,8 +562,9 @@ public class FairScheduler implements Re
application.stop(rmAppAttemptFinalState);
// Inform the queue
- FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName());
- queue.removeJob(application);
+ FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+ .getQueueName());
+ queue.removeApp(application);
// Remove from our data-structure
applications.remove(applicationAttemptId);
@@ -600,7 +610,7 @@ public class FairScheduler implements Re
}
private synchronized void addNode(RMNode node) {
- this.nodes.put(node.getNodeID(), new FSSchedulerNode(node));
+ nodes.put(node.getNodeID(), new FSSchedulerNode(node));
Resources.addTo(clusterCapacity, node.getTotalCapability());
LOG.info("Added node " + node.getNodeAddress() +
@@ -608,7 +618,7 @@ public class FairScheduler implements Re
}
private synchronized void removeNode(RMNode rmNode) {
- FSSchedulerNode node = this.nodes.get(rmNode.getNodeID());
+ FSSchedulerNode node = nodes.get(rmNode.getNodeID());
Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
// Remove running containers
@@ -631,7 +641,7 @@ public class FairScheduler implements Re
RMContainerEventType.KILL);
}
- this.nodes.remove(rmNode.getNodeID());
+ nodes.remove(rmNode.getNodeID());
LOG.info("Removed node " + rmNode.getNodeAddress() +
" cluster capacity: " + clusterCapacity);
}
@@ -655,11 +665,11 @@ public class FairScheduler implements Re
for (ContainerId releasedContainerId : release) {
RMContainer rmContainer = getRMContainer(releasedContainerId);
if (rmContainer == null) {
- RMAuditLogger.logFailure(application.getUser(),
- AuditConstants.RELEASE_CONTAINER,
- "Unauthorized access or invalid container", "FairScheduler",
- "Trying to release container not owned by app or with invalid id",
- application.getApplicationId(), releasedContainerId);
+ RMAuditLogger.logFailure(application.getUser(),
+ AuditConstants.RELEASE_CONTAINER,
+ "Unauthorized access or invalid container", "FairScheduler",
+ "Trying to release container not owned by app or with invalid id",
+ application.getApplicationId(), releasedContainerId);
}
completedContainer(rmContainer,
SchedulerUtils.createAbnormalContainerStatus(
@@ -669,13 +679,11 @@ public class FairScheduler implements Re
}
synchronized (application) {
-
if (!ask.isEmpty()) {
-
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update" +
- " applicationAttemptId=" + appAttemptId +
- " application=" + application.getApplicationId());
+ " applicationAttemptId=" + appAttemptId +
+ " application=" + application.getApplicationId());
}
application.showRequests();
@@ -686,21 +694,19 @@ public class FairScheduler implements Re
application.showRequests();
}
- if(LOG.isDebugEnabled()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug("allocate:" +
- " applicationAttemptId=" + appAttemptId +
- " #ask=" + ask.size());
+ " applicationAttemptId=" + appAttemptId +
+ " #ask=" + ask.size());
}
- return new Allocation(
- application.pullNewlyAllocatedContainers(),
+ return new Allocation(application.pullNewlyAllocatedContainers(),
application.getHeadroom());
}
}
/**
- * Process a container which has launched on a node, as reported by the
- * node.
+ * Process a container which has launched on a node, as reported by the node.
*/
private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
// Get the application for the finished container
@@ -722,7 +728,9 @@ public class FairScheduler implements Re
private synchronized void nodeUpdate(RMNode nm,
List<ContainerStatus> newlyLaunchedContainers,
List<ContainerStatus> completedContainers) {
- LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+ }
eventLog.log("HEARTBEAT", nm.getHostName());
FSSchedulerNode node = nodes.get(nm.getNodeID());
@@ -754,20 +762,20 @@ public class FairScheduler implements Re
LOG.info("Trying to fulfill reservation for application " +
reservedApplication.getApplicationId() + " on node: " + nm);
- FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
- queue.getQueueSchedulable().assignContainer(node, true);
+ FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
+ queue.assignContainer(node, true);
}
-
// Otherwise, schedule at queue which is furthest below fair share
else {
int assignedContainers = 0;
while (true) {
// At most one task is scheduled each iteration of this loop
- List<FSQueueSchedulable> scheds = this.getQueueSchedulables();
+ List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
+ queueMgr.getLeafQueues());
Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
boolean assignedContainer = false;
- for (FSQueueSchedulable sched : scheds) {
+ for (FSLeafQueue sched : scheds) {
Resource assigned = sched.assignContainer(node, false);
if (Resources.greaterThan(assigned, Resources.none())) {
eventLog.log("ASSIGN", nm.getHostName(), assigned);
@@ -796,11 +804,11 @@ public class FairScheduler implements Re
@Override
public SchedulerAppReport getSchedulerAppInfo(
ApplicationAttemptId appAttemptId) {
- if (!this.applications.containsKey(appAttemptId)) {
+ if (!applications.containsKey(appAttemptId)) {
LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
return null;
}
- return new SchedulerAppReport(this.applications.get(appAttemptId));
+ return new SchedulerAppReport(applications.get(appAttemptId));
}
@Override
@@ -810,43 +818,35 @@ public class FairScheduler implements Re
@Override
public void handle(SchedulerEvent event) {
- switch(event.getType()) {
+ switch (event.getType()) {
case NODE_ADDED:
- {
if (!(event instanceof NodeAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
addNode(nodeAddedEvent.getAddedRMNode());
- }
- break;
+ break;
case NODE_REMOVED:
- {
if (!(event instanceof NodeRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
removeNode(nodeRemovedEvent.getRemovedRMNode());
- }
- break;
+ break;
case NODE_UPDATE:
- {
if (!(event instanceof NodeUpdateSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- NodeUpdateSchedulerEvent nodeUpdatedEvent =
- (NodeUpdateSchedulerEvent)event;
- this.nodeUpdate(nodeUpdatedEvent.getRMNode(),
+ NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+ nodeUpdate(nodeUpdatedEvent.getRMNode(),
nodeUpdatedEvent.getNewlyLaunchedContainers(),
nodeUpdatedEvent.getCompletedContainers());
- }
- break;
+ break;
case APP_ADDED:
- {
if (!(event instanceof AppAddedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
- AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+ AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
String queue = appAddedEvent.getQueue();
// Potentially set queue to username if configured to do so
@@ -857,33 +857,28 @@ public class FairScheduler implements Re
addApplication(appAddedEvent.getApplicationAttemptId(), queue,
appAddedEvent.getUser());
- }
- break;
+ break;
case APP_REMOVED:
- {
if (!(event instanceof AppRemovedSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
- this.removeApplication(appRemovedEvent.getApplicationAttemptID(),
+ removeApplication(appRemovedEvent.getApplicationAttemptID(),
appRemovedEvent.getFinalAttemptState());
- }
- break;
+ break;
case CONTAINER_EXPIRED:
- {
if (!(event instanceof ContainerExpiredSchedulerEvent)) {
throw new RuntimeException("Unexpected event type: " + event);
}
ContainerExpiredSchedulerEvent containerExpiredEvent =
- (ContainerExpiredSchedulerEvent) event;
+ (ContainerExpiredSchedulerEvent)event;
ContainerId containerId = containerExpiredEvent.getContainerId();
completedContainer(getRMContainer(containerId),
SchedulerUtils.createAbnormalContainerStatus(
containerId,
SchedulerUtils.EXPIRED_CONTAINER),
RMContainerEventType.EXPIRE);
- }
- break;
+ break;
default:
LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
}
@@ -895,13 +890,12 @@ public class FairScheduler implements Re
}
@Override
- public synchronized void
- reinitialize(Configuration conf, RMContext rmContext) throws IOException {
- if (!this.initialized) {
+ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+ throws IOException {
+ if (!initialized) {
this.conf = new FairSchedulerConfiguration(conf);
- this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
+ rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
- this.clock = new SystemClock();
this.eventLog = new FairSchedulerEventLog();
eventLog.init(this.conf);
minimumAllocation = this.conf.getMinimumMemoryAllocation();
@@ -913,21 +907,20 @@ public class FairScheduler implements Re
assignMultiple = this.conf.getAssignMultiple();
maxAssign = this.conf.getMaxAssign();
- Thread updateThread = new Thread(new UpdateThread());
- updateThread.start();
-
initialized = true;
sizeBasedWeight = this.conf.getSizeBasedWeight();
- queueMgr = new QueueManager(this);
-
try {
queueMgr.initialize();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
+
+ Thread updateThread = new Thread(new UpdateThread());
+ updateThread.setName("FairSchedulerUpdateThread");
+ updateThread.setDaemon(true);
+ updateThread.start();
} else {
this.conf = new FairSchedulerConfiguration(conf);
userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
@@ -935,10 +928,9 @@ public class FairScheduler implements Re
rackLocalityThreshold = this.conf.getLocalityThresholdRack();
preemptionEnabled = this.conf.getPreemptionEnabled();
try {
- queueMgr.reloadAllocs();
+ queueMgr.reloadAllocs();
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
}
@@ -950,8 +942,8 @@ public class FairScheduler implements Re
if (!queueMgr.exists(queueName)) {
return null;
}
- return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
- includeChildQueues, recursive);
+ return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+ recursive);
}
@Override
@@ -963,17 +955,12 @@ public class FairScheduler implements Re
return new ArrayList<QueueUserACLInfo>();
}
- List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-
- for (FSQueue queue : queueMgr.getQueues()) {
- userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
- }
- return userAcls;
+ return queueMgr.getRootQueue().getQueueUserAclInfo(user);
}
@Override
public int getNumClusterNodes() {
- return this.nodes.size();
+ return nodes.size();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Nov 30 19:58:09 2012
@@ -18,7 +18,7 @@ public class FairSchedulerConfiguration
/** Whether to use the user name as the queue name (instead of "default") if
* the request does not specify a queue. */
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
- protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
+ protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Fri Nov 30 19:58:09 2012
@@ -77,12 +77,11 @@ class FairSchedulerEventLog {
boolean init(FairSchedulerConfiguration conf) {
try {
logDir = conf.getEventlogDir();
- Path logDirPath = new Path(logDir);
- FileSystem fs = logDirPath.getFileSystem(conf);
- if (!fs.exists(logDirPath)) {
- if (!fs.mkdirs(logDirPath)) {
+ File logDirFile = new File(logDir);
+ if (!logDirFile.exists()) {
+ if (!logDirFile.mkdirs()) {
throw new IOException(
- "Mkdirs failed to create " + logDirPath.toString());
+ "Mkdirs failed to create " + logDirFile.toString());
}
}
String username = System.getProperty("user.name");
@@ -142,4 +141,8 @@ class FairSchedulerEventLog {
synchronized boolean isEnabled() {
return !logDisabled;
}
+
+ public String getLogFile() {
+ return logFile;
+ }
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 30 19:58:09 2012
@@ -24,10 +24,10 @@ import java.net.URL;
import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -53,6 +53,7 @@ import org.xml.sax.SAXException;
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
+ *
*/
@Private
@Unstable
@@ -60,6 +61,8 @@ public class QueueManager {
public static final Log LOG = LogFactory.getLog(
QueueManager.class.getName());
+ public static final String ROOT_QUEUE = "root";
+
/** Time to wait between checks of the allocation file */
public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
@@ -71,89 +74,37 @@ public class QueueManager {
private final FairScheduler scheduler;
- // Minimum resource allocation for each queue
- private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
- // Maximum amount of resources per queue
- private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
- // Sharing weights for each queue
- private Map<String, Double> queueWeights = new HashMap<String, Double>();
-
- // Max concurrent running applications for each queue and for each user; in addition,
- // for users that have no max specified, we use the userMaxJobsDefault.
- private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
- private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
- private int userMaxAppsDefault = Integer.MAX_VALUE;
- private int queueMaxAppsDefault = Integer.MAX_VALUE;
-
- // ACL's for each queue. Only specifies non-default ACL's from configuration.
- private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
- new HashMap<String, Map<QueueACL, AccessControlList>>();
-
- // Min share preemption timeout for each queue in seconds. If a job in the queue
- // waits this long without receiving its guaranteed share, it is allowed to
- // preempt other jobs' tasks.
- private Map<String, Long> minSharePreemptionTimeouts =
- new HashMap<String, Long>();
-
- // Default min share preemption timeout for queues where it is not set
- // explicitly.
- private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-
- // Preemption timeout for jobs below fair share in seconds. If a job remains
- // below half its fair share for this long, it is allowed to preempt tasks.
- private long fairSharePreemptionTimeout = Long.MAX_VALUE;
-
- SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
-
private Object allocFile; // Path to XML file containing allocations. This
// is either a URL to specify a classpath resource
// (if the fair-scheduler.xml on the classpath is
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
- private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private final Collection<FSLeafQueue> leafQueues =
+ new CopyOnWriteArrayList<FSLeafQueue>();
+ private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+ private FSParentQueue rootQueue;
+ private volatile QueueManagerInfo info = new QueueManagerInfo();
+
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
private boolean lastReloadAttemptFailed = false;
- // Monitor object for minQueueResources
- private Object minQueueResourcesMO = new Object();
-
- //Monitor object for maxQueueResources
- private Object maxQueueResourcesMO = new Object();
-
- //Monitor object for queueMaxApps
- private Object queueMaxAppsMO = new Object();
-
- //Monitor object for userMaxApps
- private Object userMaxAppsMO = new Object();
-
- //Monitor object for queueWeights
- private Object queueWeightsMO = new Object();
-
- //Monitor object for minSharePreemptionTimeouts
- private Object minSharePreemptionTimeoutsMO = new Object();
-
- //Monitor object for queueAcls
- private Object queueAclsMO = new Object();
-
- //Monitor object for userMaxAppsDefault
- private Object userMaxAppsDefaultMO = new Object();
-
- //Monitor object for queueMaxAppsDefault
- private Object queueMaxAppsDefaultMO = new Object();
-
- //Monitor object for defaultSchedulingMode
- private Object defaultSchedulingModeMO = new Object();
-
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
+
+ public FSParentQueue getRootQueue() {
+ return rootQueue;
+ }
public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
FairSchedulerConfiguration conf = scheduler.getConf();
+ rootQueue = new FSParentQueue("root", this, scheduler, null);
+ queues.put(rootQueue.getName(), rootQueue);
+
this.allocFile = conf.getAllocationFile();
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
@@ -169,23 +120,106 @@ public class QueueManager {
lastSuccessfulReload = scheduler.getClock().getTime();
lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
- getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
}
-
+
/**
- * Get a queue by name, creating it if necessary
- */
- public FSQueue getQueue(String name) {
+ * Get a queue by name, creating it if necessary. If the queue
+ * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
+ * or one of the parents in its name is already a leaf queue, null is returned.
+ *
+ * The root part of the name is optional, so a queue underneath the root
+ * named "queue1" could be referred to as just "queue1", and a queue named
+ * "queue2" underneath a parent named "parent1" that is underneath the root
+ * could be referred to as just "parent1.queue2".
+ */
+ public FSLeafQueue getLeafQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".")) {
+ name = ROOT_QUEUE + "." + name;
+ }
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null) {
- queue = new FSQueue(scheduler, name);
- synchronized (defaultSchedulingModeMO){
- queue.setSchedulingMode(defaultSchedulingMode);
+ FSLeafQueue leafQueue = createLeafQueue(name);
+ if (leafQueue == null) {
+ return null;
}
- queues.put(name, queue);
+ leafQueue.setSchedulingMode(info.defaultSchedulingMode);
+ queue = leafQueue;
+ } else if (queue instanceof FSParentQueue) {
+ return null;
+ }
+ return (FSLeafQueue)queue;
+ }
+ }
+
+ /**
+ * Creates a leaf queue and places it in the tree. Creates any
+ * parents that don't already exist.
+ *
+ * @return
+ * the created queue, if successful. null if not allowed (one of the parent
+ * queues in the queue name is already a leaf queue)
+ */
+ private FSLeafQueue createLeafQueue(String name) {
+ List<String> newQueueNames = new ArrayList<String>();
+ newQueueNames.add(name);
+ int sepIndex = name.length();
+ FSParentQueue parent = null;
+
+ // Move up the queue tree until we reach one that exists.
+ while (sepIndex != -1) {
+ sepIndex = name.lastIndexOf('.', sepIndex-1);
+ FSQueue queue;
+ String curName = null;
+ curName = name.substring(0, sepIndex);
+ queue = queues.get(curName);
+
+ if (queue == null) {
+ newQueueNames.add(curName);
+ } else {
+ if (queue instanceof FSParentQueue) {
+ parent = (FSParentQueue)queue;
+ break;
+ } else {
+ return null;
+ }
+ }
+ }
+
+ // At this point, parent refers to the deepest existing parent of the
+ // queue to create.
+ // Now that we know everything worked out, make all the queues
+ // and add them to the map.
+ FSLeafQueue leafQueue = null;
+ for (int i = newQueueNames.size()-1; i >= 0; i--) {
+ String queueName = newQueueNames.get(i);
+ if (i == 0) {
+ // First name added was the leaf queue
+ leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+ parent.addChildQueue(leafQueue);
+ queues.put(leafQueue.getName(), leafQueue);
+ leafQueues.add(leafQueue);
+ } else {
+ FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+ parent.addChildQueue(newParent);
+ queues.put(newParent.getName(), newParent);
+ parent = newParent;
}
- return queue;
+ }
+
+ return leafQueue;
+ }
+
+ /**
+ * Gets a queue by name.
+ */
+ public FSQueue getQueue(String name) {
+ if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+ name = ROOT_QUEUE + "." + name;
+ }
+ synchronized (queues) {
+ return queues.get(name);
}
}
@@ -201,8 +235,8 @@ public class QueueManager {
/**
* Get the queue for a given AppSchedulable.
*/
- public FSQueue getQueueForApp(AppSchedulable app) {
- return this.getQueue(app.getApp().getQueueName());
+ public FSLeafQueue getQueueForApp(AppSchedulable app) {
+ return getLeafQueue(app.getApp().getQueueName());
}
/**
@@ -272,6 +306,8 @@ public class QueueManager {
new HashMap<String, Map<QueueACL, AccessControlList>>();
int userMaxAppsDefault = Integer.MAX_VALUE;
int queueMaxAppsDefault = Integer.MAX_VALUE;
+ long fairSharePreemptionTimeout = Long.MAX_VALUE;
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all queue names so we can display them on web UI, etc.
@@ -300,54 +336,9 @@ public class QueueManager {
Element element = (Element)node;
if ("queue".equals(element.getTagName()) ||
"pool".equals(element.getTagName())) {
- String queueName = element.getAttribute("name");
- Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
- queueNamesInAllocFile.add(queueName);
- NodeList fields = element.getChildNodes();
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element) fieldNode;
- if ("minResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- minQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- maxQueueResources.put(queueName, Resources.createResource(val));
- } else if ("maxRunningApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- queueMaxApps.put(queueName, val);
- } else if ("weight".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- double val = Double.parseDouble(text);
- queueWeights.put(queueName, val);
- } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- minSharePreemptionTimeouts.put(queueName, val);
- } else if ("schedulingMode".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- queueModes.put(queueName, parseSchedulingMode(text));
- } else if ("aclSubmitApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
- } else if ("aclAdministerApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
- }
- }
- queueAcls.put(queueName, acls);
- if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
- && Resources.lessThan(maxQueueResources.get(queueName),
- minQueueResources.get(queueName))) {
- LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
- queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
- }
+ loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@@ -388,19 +379,13 @@ public class QueueManager {
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
- synchronized(this) {
- setMinResources(minQueueResources);
- setMaxResources(maxQueueResources);
- setQueueMaxApps(queueMaxApps);
- setUserMaxApps(userMaxApps);
- setQueueWeights(queueWeights);
- setUserMaxAppsDefault(userMaxAppsDefault);
- setQueueMaxAppsDefault(queueMaxAppsDefault);
- setDefaultSchedulingMode(defaultSchedulingMode);
- setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
- setQueueAcls(queueAcls);
+ synchronized (this) {
+ info = new QueueManagerInfo(minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
+ queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
+ queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
for (String name: queueNamesInAllocFile) {
- FSQueue queue = getQueue(name);
+ FSLeafQueue queue = getLeafQueue(name);
if (queueModes.containsKey(name)) {
queue.setSchedulingMode(queueModes.get(name));
} else {
@@ -409,6 +394,75 @@ public class QueueManager {
}
}
}
+
+ /**
+ * Loads a queue from a queue element in the configuration file
+ */
+ private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+ Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+ Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
+ throws AllocationConfigurationException {
+ String queueName = parentName + "." + element.getAttribute("name");
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ NodeList fields = element.getChildNodes();
+ boolean isLeaf = true;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ minQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ maxQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, val);
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ queueModes.put(queueName, parseSchedulingMode(text));
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ } else if ("queue".endsWith(field.getTagName()) ||
+ "pool".equals(field.getTagName())) {
+ loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
+ isLeaf = false;
+ }
+ }
+ if (isLeaf) {
+ queueNamesInAllocFile.add(queueName);
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+ && Resources.lessThan(maxQueueResources.get(queueName),
+ minQueueResources.get(queueName))) {
+ LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+ queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+ }
+ }
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
@@ -428,181 +482,87 @@ public class QueueManager {
* @return the cap set on this queue, or 0 if not set.
*/
public Resource getMinResources(String queue) {
- synchronized(minQueueResourcesMO) {
- if (minQueueResources.containsKey(queue)) {
- return minQueueResources.get(queue);
- } else{
- return Resources.createResource(0);
- }
+ Resource minQueueResource = info.minQueueResources.get(queue);
+ if (minQueueResource != null) {
+ return minQueueResource;
+ } else {
+ return Resources.createResource(0);
}
}
- private void setMinResources(Map<String, Resource> resources) {
- synchronized(minQueueResourcesMO) {
- minQueueResources = resources;
- }
- }
/**
* Get the maximum resource allocation for the given queue.
* @return the cap set on this queue, or Integer.MAX_VALUE if not set.
*/
- public Resource getMaxResources(String queueName) {
- synchronized (maxQueueResourcesMO) {
- if (maxQueueResources.containsKey(queueName)) {
- return maxQueueResources.get(queueName);
- } else {
- return Resources.createResource(Integer.MAX_VALUE);
- }
- }
- }
- private void setMaxResources(Map<String, Resource> resources) {
- synchronized(maxQueueResourcesMO) {
- maxQueueResources = resources;
+ public Resource getMaxResources(String queueName) {
+ Resource maxQueueResource = info.maxQueueResources.get(queueName);
+ if (maxQueueResource != null) {
+ return maxQueueResource;
+ } else {
+ return Resources.createResource(Integer.MAX_VALUE);
}
}
-
- /**
- * Add an app in the appropriate queue
- */
- public synchronized void addApp(FSSchedulerApp app) {
- getQueue(app.getQueueName()).addApp(app);
- }
-
- /**
- * Remove an app
- */
- public synchronized void removeJob(FSSchedulerApp app) {
- getQueue(app.getQueueName()).removeJob(app);
- }
/**
* Get a collection of all queues
*/
- public Collection<FSQueue> getQueues() {
+ public Collection<FSLeafQueue> getLeafQueues() {
synchronized (queues) {
- return queues.values();
+ return leafQueues;
}
}
- /**
- * Get all queue names that have been seen either in the allocation file or in
- * a submitted app.
- */
- public synchronized Collection<String> getQueueNames() {
- List<String> list = new ArrayList<String>();
- for (FSQueue queue: getQueues()) {
- list.add(queue.getName());
- }
- Collections.sort(list);
- return list;
- }
-
public int getUserMaxApps(String user) {
- synchronized (userMaxAppsMO) {
- if (userMaxApps.containsKey(user)) {
- return userMaxApps.get(user);
- } else {
- return getUserMaxAppsDefault();
- }
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.userMaxApps.containsKey(user)) {
+ return info.userMaxApps.get(user);
+ } else {
+ return info.userMaxAppsDefault;
}
}
- private void setUserMaxApps(Map<String, Integer> userApps) {
- synchronized (userMaxAppsMO) {
- userMaxApps = userApps;
- }
- }
-
- private int getUserMaxAppsDefault() {
- synchronized (userMaxAppsDefaultMO){
- return userMaxAppsDefault;
- }
- }
-
- private void setUserMaxAppsDefault(int userMaxApps) {
- synchronized (userMaxAppsDefaultMO){
- userMaxAppsDefault = userMaxApps;
- }
- }
-
public int getQueueMaxApps(String queue) {
- synchronized (queueMaxAppsMO) {
- if (queueMaxApps.containsKey(queue)) {
- return queueMaxApps.get(queue);
- } else {
- return getQueueMaxAppsDefault();
- }
- }
- }
-
- private void setQueueMaxApps(Map<String, Integer> queueApps) {
- synchronized (queueMaxAppsMO) {
- queueMaxApps = queueApps;
- }
- }
-
- private int getQueueMaxAppsDefault(){
- synchronized(queueMaxAppsDefaultMO) {
- return queueMaxAppsDefault;
- }
- }
-
- private void setQueueMaxAppsDefault(int queueMaxApps){
- synchronized(queueMaxAppsDefaultMO) {
- queueMaxAppsDefault = queueMaxApps;
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.queueMaxApps.containsKey(queue)) {
+ return info.queueMaxApps.get(queue);
+ } else {
+ return info.queueMaxAppsDefault;
}
}
- private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
- synchronized(defaultSchedulingModeMO) {
- defaultSchedulingMode = schedulingMode;
- }
- }
-
public double getQueueWeight(String queue) {
- synchronized (queueWeightsMO) {
- if (queueWeights.containsKey(queue)) {
- return queueWeights.get(queue);
- } else {
- return 1.0;
- }
+ Double weight = info.queueWeights.get(queue);
+ if (weight != null) {
+ return weight;
+ } else {
+ return 1.0;
}
}
- private void setQueueWeights(Map<String, Double> weights) {
- synchronized (queueWeightsMO) {
- queueWeights = weights;
- }
- }
/**
- * Get a queue's min share preemption timeout, in milliseconds. This is the
- * time after which jobs in the queue may kill other queues' tasks if they
- * are below their min share.
- */
+ * Get a queue's min share preemption timeout, in milliseconds. This is the
+ * time after which jobs in the queue may kill other queues' tasks if they
+ * are below their min share.
+ */
public long getMinSharePreemptionTimeout(String queueName) {
- synchronized (minSharePreemptionTimeoutsMO) {
- if (minSharePreemptionTimeouts.containsKey(queueName)) {
- return minSharePreemptionTimeouts.get(queueName);
- }
+ // save current info in case it gets changed under us
+ QueueManagerInfo info = this.info;
+ if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
+ return info.minSharePreemptionTimeouts.get(queueName);
}
- return defaultMinSharePreemptionTimeout;
+ return info.defaultMinSharePreemptionTimeout;
}
- private void setMinSharePreemptionTimeouts(
- Map<String, Long> sharePreemptionTimeouts){
- synchronized (minSharePreemptionTimeoutsMO) {
- minSharePreemptionTimeouts = sharePreemptionTimeouts;
- }
- }
-
/**
* Get the fair share preemption, in milliseconds. This is the time
* after which any job may kill other jobs' tasks if it is below half
* its fair share.
*/
public long getFairSharePreemptionTimeout() {
- return fairSharePreemptionTimeout;
+ return info.fairSharePreemptionTimeout;
}
/**
@@ -611,10 +571,9 @@ public class QueueManager {
*/
public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
- synchronized (queueAclsMO) {
- if (queueAcls.containsKey(queue)) {
- out.putAll(queueAcls.get(queue));
- }
+ Map<QueueACL, AccessControlList> queueAcl = info.queueAcls.get(queue);
+ if (queueAcl != null) {
+ out.putAll(queueAcl);
}
if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@@ -625,9 +584,74 @@ public class QueueManager {
return out;
}
- private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
- synchronized (queueAclsMO) {
- queueAcls = queue;
+ static class QueueManagerInfo {
+ // Minimum resource allocation for each queue
+ public final Map<String, Resource> minQueueResources;
+ // Maximum amount of resources per queue
+ public final Map<String, Resource> maxQueueResources;
+ // Sharing weights for each queue
+ public final Map<String, Double> queueWeights;
+
+ // Max concurrent running applications for each queue and for each user; in addition,
+ // for users that have no max specified, we use the userMaxJobsDefault.
+ public final Map<String, Integer> queueMaxApps;
+ public final Map<String, Integer> userMaxApps;
+ public final int userMaxAppsDefault;
+ public final int queueMaxAppsDefault;
+
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
+ public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
+
+ // Min share preemption timeout for each queue in seconds. If a job in the queue
+ // waits this long without receiving its guaranteed share, it is allowed to
+ // preempt other jobs' tasks.
+ public final Map<String, Long> minSharePreemptionTimeouts;
+
+ // Default min share preemption timeout for queues where it is not set
+ // explicitly.
+ public final long defaultMinSharePreemptionTimeout;
+
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
+ // below half its fair share for this long, it is allowed to preempt tasks.
+ public final long fairSharePreemptionTimeout;
+
+ public final SchedulingMode defaultSchedulingMode;
+
+ public QueueManagerInfo(Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
+ Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
+ Map<String, Double> queueWeights, int userMaxAppsDefault,
+ int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode,
+ Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
+ this.minQueueResources = minQueueResources;
+ this.maxQueueResources = maxQueueResources;
+ this.queueMaxApps = queueMaxApps;
+ this.userMaxApps = userMaxApps;
+ this.queueWeights = queueWeights;
+ this.userMaxAppsDefault = userMaxAppsDefault;
+ this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.defaultSchedulingMode = defaultSchedulingMode;
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+ this.queueAcls = queueAcls;
+ this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+ this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ }
+
+ public QueueManagerInfo() {
+ minQueueResources = new HashMap<String, Resource>();
+ maxQueueResources = new HashMap<String, Resource>();
+ queueWeights = new HashMap<String, Double>();
+ queueMaxApps = new HashMap<String, Integer>();
+ userMaxApps = new HashMap<String, Integer>();
+ userMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
+ minSharePreemptionTimeouts = new HashMap<String, Long>();
+ defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ fairSharePreemptionTimeout = Long.MAX_VALUE;
+ defaultSchedulingMode = SchedulingMode.FAIR;
}
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Nov 30 19:58:09 2012
@@ -92,12 +92,6 @@ abstract class Schedulable {
public abstract void updateDemand();
/**
- * Distribute the fair share assigned to this Schedulable among its
- * children (used in queues where the internal scheduler is fair sharing).
- */
- public abstract void redistributeShare();
-
- /**
* Assign a container on this node if possible, and return the amount of
* resources assigned. If {@code reserved} is true, it means a reservation
* already exists on this node, and the schedulable should fulfill that
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Fri Nov 30 19:58:09 2012
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurab
/**
* A pluggable object for altering the weights of apps in the fair scheduler,
- * which is used for example by {@link NewJobWeightBooster} to give higher
+ * which is used for example by {@link NewAppWeightBooster} to give higher
* weight to new jobs so that short jobs finish faster.
*
* May implement {@link Configurable} to access configuration parameters.
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Nov 30 19:58:09 2012
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.factory.pr
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -292,7 +292,7 @@ public class FifoScheduler implements Re
// TODO: Fix store
FiCaSchedulerApp schedulerApp =
new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
- this.rmContext, null);
+ this.rmContext);
applications.put(appAttemptId, schedulerApp);
metrics.submitApp(user, appAttemptId.getAttemptId());
LOG.info("Application Submission: " + appAttemptId.getApplicationId() +
@@ -763,13 +763,7 @@ public class FifoScheduler implements Re
@Override
public void recover(RMState state) {
- // TODO fix recovery
-// for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
-// ApplicationId appId = entry.getKey();
-// ApplicationInfo appInfo = entry.getValue();
-// SchedulerApp app = applications.get(appId);
-// app.allocate(appInfo.getContainers());
-// }
+ // NOT IMPLEMENTED
}
@Override
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Fri Nov 30 19:58:09 2012
@@ -20,12 +20,13 @@ package org.apache.hadoop.yarn.server.re
import static org.apache.hadoop.yarn.util.StringHelper.join;
import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
import java.util.Collection;
import java.util.HashSet;
+import org.apache.commons.lang.StringEscapeUtils;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -72,41 +73,50 @@ class AppsBlock extends HtmlBlock {
reqAppStates.add(RMAppState.valueOf(stateString));
}
}
+ StringBuilder appsTableData = new StringBuilder("[\n");
for (RMApp app : list.apps.values()) {
if (reqAppStates != null && !reqAppStates.contains(app.getState())) {
continue;
}
AppInfo appInfo = new AppInfo(app, true);
String percent = String.format("%.1f", appInfo.getProgress());
- String startTime = Times.format(appInfo.getStartTime());
- String finishTime = Times.format(appInfo.getFinishTime());
- tbody.
- tr().
- td().
- br().$title(appInfo.getAppIdNum())._(). // for sorting
- a(url("app", appInfo.getAppId()), appInfo.getAppId())._().
- td(appInfo.getUser()).
- td(appInfo.getName()).
- td(appInfo.getQueue()).
- td().
- br().$title(String.valueOf(appInfo.getStartTime()))._().
- _(startTime)._().
- td().
- br().$title(String.valueOf(appInfo.getFinishTime()))._().
- _(finishTime)._().
- td(appInfo.getState()).
- td(appInfo.getFinalStatus()).
- td().
- br().$title(percent)._(). // for sorting
- div(_PROGRESSBAR).
- $title(join(percent, '%')). // tooltip
- div(_PROGRESSBAR_VALUE).
- $style(join("width:", percent, '%'))._()._()._().
- td().
- a(!appInfo.isTrackingUrlReady()?
- "#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._();
+ //AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
+ appsTableData.append("[\"<a href='")
+ .append(url("app", appInfo.getAppId())).append("'>")
+ .append(appInfo.getAppId()).append("</a>\",\"")
+ .append(StringEscapeUtils.escapeHtml(appInfo.getUser()))
+ .append("\",\"")
+ .append(StringEscapeUtils.escapeHtml(appInfo.getName()))
+ .append("\",\"")
+ .append(StringEscapeUtils.escapeHtml(appInfo.getQueue()))
+ .append("\",\"")
+ .append(appInfo.getStartTime()).append("\",\"")
+ .append(appInfo.getFinishTime()).append("\",\"")
+ .append(appInfo.getState()).append("\",\"")
+ .append(appInfo.getFinalStatus()).append("\",\"")
+ // Progress bar
+ .append("<br title='").append(percent)
+ .append("'> <div class='").append(C_PROGRESSBAR).append("' title='")
+ .append(join(percent, '%')).append("'> ").append("<div class='")
+ .append(C_PROGRESSBAR_VALUE).append("' style='")
+ .append(join("width:", percent, '%')).append("'> </div> </div>")
+ .append("\",\"<a href='");
+
+ String trackingURL =
+ !appInfo.isTrackingUrlReady()? "#" : appInfo.getTrackingUrlPretty();
+
+ appsTableData.append(trackingURL).append("'>")
+ .append(appInfo.getTrackingUI()).append("</a>\"],\n");
+
if (list.rendering != Render.HTML && ++i >= 20) break;
}
+ if(appsTableData.charAt(appsTableData.length() - 2) == ',') {
+ appsTableData.delete(appsTableData.length()-2, appsTableData.length()-1);
+ }
+ appsTableData.append("]");
+ html.script().$type("text/javascript").
+ _("var appsTableData=" + appsTableData)._();
+
tbody._()._();
if (list.rendering == Render.JS_ARRAY) {
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java Fri Nov 30 19:58:09 2012
@@ -50,7 +50,6 @@ public class NavBlock extends HtmlBlock
li().a("/conf", "Configuration")._().
li().a("/logs", "Local logs")._().
li().a("/stacks", "Server stacks")._().
- li().a("/metrics", "Server metrics")._()._()._().
- div("#themeswitcher")._();
+ li().a("/metrics", "Server metrics")._()._()._();
}
}
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Fri Nov 30 19:58:09 2012
@@ -47,7 +47,6 @@ public class RmView extends TwoColumnLay
protected void commonPreHead(Page.HTML<_> html) {
set(ACCORDION_ID, "nav");
set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
- set(THEMESWITCHER_ID, "themeswitcher");
}
@Override
@@ -63,13 +62,23 @@ public class RmView extends TwoColumnLay
private String appsTableInit() {
AppsList list = getInstance(AppsList.class);
// id, user, name, queue, starttime, finishtime, state, status, progress, ui
- StringBuilder init = tableInit().
- append(", aoColumns:[{sType:'title-numeric'}, null, null, null, ").
- append("{sType:'title-numeric'}, {sType:'title-numeric'} , null, ").
- append("null,{sType:'title-numeric', bSearchable:false}, null]");
+ StringBuilder init = tableInit()
+ .append(", 'aaData': appsTableData")
+ .append(", bDeferRender: true")
+ .append(", bProcessing: true")
- // Sort by id upon page load
- init.append(", aaSorting: [[0, 'desc']]");
+ .append("\n, aoColumnDefs: [\n")
+ .append("{'sType':'numeric', 'aTargets': [0]")
+ .append(", 'mRender': parseHadoopID }")
+
+ .append("\n, {'sType':'numeric', 'aTargets': [4, 5]")
+ .append(", 'mRender': renderHadoopDate }")
+
+ .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [8]")
+ .append(", 'mRender': parseHadoopProgress }]")
+
+ // Sort by id upon page load
+ .append(", aaSorting: [[0, 'desc']]");
String rows = $("rowlimit");
int rowLimit = rows.isEmpty() ? MAX_DISPLAY_ROWS : Integer.parseInt(rows);
Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Fri Nov 30 19:58:09 2012
@@ -23,7 +23,7 @@ import java.util.Collection;
import java.util.List;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
public class FairSchedulerInfo {
@@ -32,9 +32,9 @@ public class FairSchedulerInfo {
public FairSchedulerInfo(FairScheduler fs) {
scheduler = fs;
- Collection<FSQueue> queues = fs.getQueueManager().getQueues();
+ Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
queueInfos = new ArrayList<FairSchedulerQueueInfo>();
- for (FSQueue queue : queues) {
+ for (FSLeafQueue queue : queues) {
queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
}
}