You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by su...@apache.org on 2012/11/30 20:58:44 UTC

svn commit: r1415815 [3/5] - in /hadoop/common/branches/branch-trunk-win/hadoop-yarn-project: ./ hadoop-yarn/dev-support/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/ hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-a...

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Fri Nov 30 19:58:09 2012
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.re
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -50,7 +51,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -75,6 +76,25 @@ import org.apache.hadoop.yarn.server.res
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
 
+/**
+ * A scheduler that schedules resources between a set of queues. The scheduler
+ * keeps track of the resources used by each queue, and attempts to maintain
+ * fairness by scheduling tasks at queues whose allocations are farthest below
+ * an ideal fair distribution.
+ * 
+ * The fair scheduler supports hierarchical queues. All queues descend from a
+ * queue named "root". Available resources are distributed among the children
+ * of the root queue in the typical fair scheduling fashion. Then, the children
+ * distribute the resources assigned to them to their children in the same
+ * fashion.  Applications may only be scheduled on leaf queues. Queues can be
+ * specified as children of other queues by placing them as sub-elements of their
+ * parents in the fair scheduler configuration file.
+ * 
+ * A queue's name starts with the names of its parents, with periods as
+ * separators.  So a queue named "queue1" under the root named, would be 
+ * referred to as "root.queue1", and a queue named "queue2" under a queue
+ * named "parent1" would be referred to as "root.parent1.queue2".
+ */
 @LimitedPrivate("yarn")
 @Unstable
 @SuppressWarnings("unchecked")
@@ -94,7 +114,7 @@ public class FairScheduler implements Re
   protected long UPDATE_INTERVAL = 500;
 
   // Whether to use username in place of "default" queue name
-  private boolean userAsDefaultQueue = false;
+  private volatile boolean userAsDefaultQueue = false;
 
   private final static List<Container> EMPTY_CONTAINER_LIST =
       new ArrayList<Container>();
@@ -105,23 +125,22 @@ public class FairScheduler implements Re
   // Aggregate metrics
   QueueMetrics rootMetrics;
 
-  //Time when we last updated preemption vars
+  // Time when we last updated preemption vars
   protected long lastPreemptionUpdateTime;
-  //Time we last ran preemptTasksIfNecessary
+  // Time we last ran preemptTasksIfNecessary
   private long lastPreemptCheckTime;
 
-
   // This stores per-application scheduling information, indexed by
   // attempt ID's for fast lookup.
-  protected Map<ApplicationAttemptId, FSSchedulerApp> applications
-  = new HashMap<ApplicationAttemptId, FSSchedulerApp>();
+  protected Map<ApplicationAttemptId, FSSchedulerApp> applications = 
+      new HashMap<ApplicationAttemptId, FSSchedulerApp>();
 
   // Nodes in the cluster, indexed by NodeId
-  private Map<NodeId, FSSchedulerNode> nodes =
+  private Map<NodeId, FSSchedulerNode> nodes = 
       new ConcurrentHashMap<NodeId, FSSchedulerNode>();
 
   // Aggregate capacity of the cluster
-  private Resource clusterCapacity =
+  private Resource clusterCapacity = 
       RecordFactoryProvider.getRecordFactory(null).newRecordInstance(Resource.class);
 
   // How often tasks are preempted (must be longer than a couple
@@ -131,31 +150,28 @@ public class FairScheduler implements Re
   protected boolean preemptionEnabled;
   protected boolean sizeBasedWeight; // Give larger weights to larger jobs
   protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
-  protected double nodeLocalityThreshold;   // Cluster threshold for node locality
-  protected double rackLocalityThreshold;   // Cluster threshold for rack locality
-  private FairSchedulerEventLog eventLog;   // Machine-readable event log
-  protected boolean assignMultiple; // Allocate multiple containers per heartbeat
+  protected double nodeLocalityThreshold; // Cluster threshold for node locality
+  protected double rackLocalityThreshold; // Cluster threshold for rack locality
+  private FairSchedulerEventLog eventLog; // Machine-readable event log
+  protected boolean assignMultiple; // Allocate multiple containers per
+                                    // heartbeat
   protected int maxAssign; // Max containers to assign per heartbeat
-
+  
+  public FairScheduler() {
+    clock = new SystemClock();
+    queueMgr = new QueueManager(this);
+  }
 
   public FairSchedulerConfiguration getConf() {
-    return this.conf;
+    return conf;
   }
 
   public QueueManager getQueueManager() {
-    return this.queueMgr;
-  }
-
-  public List<FSQueueSchedulable> getQueueSchedulables() {
-    List<FSQueueSchedulable> scheds = new ArrayList<FSQueueSchedulable>();
-    for (FSQueue queue: queueMgr.getQueues()) {
-      scheds.add(queue.getQueueSchedulable());
-    }
-    return scheds;
+    return queueMgr;
   }
 
   private RMContainer getRMContainer(ContainerId containerId) {
-    FSSchedulerApp application =
+    FSSchedulerApp application = 
         applications.get(containerId.getApplicationAttemptId());
     return (application == null) ? null : application.getRMContainer(containerId);
   }
@@ -166,7 +182,7 @@ public class FairScheduler implements Re
    */
   private class UpdateThread implements Runnable {
     public void run() {
-      while (initialized) {
+      while (true) {
         try {
           Thread.sleep(UPDATE_INTERVAL);
           update();
@@ -179,40 +195,28 @@ public class FairScheduler implements Re
   }
 
   /**
-  * Recompute the internal variables used by the scheduler - per-job weights,
-  * fair shares, deficits, minimum slot allocations, and amount of used and
-  * required resources per job.
-  */
-  protected void update() {
-    synchronized (this) {
-      queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
-      updateRunnability(); // Set job runnability based on user/queue limits
-      updatePreemptionVariables(); // Determine if any queues merit preemption
-
-      // Update demands of apps and queues
-      for (FSQueue queue: queueMgr.getQueues()) {
-        queue.getQueueSchedulable().updateDemand();
-      }
-
-      // Compute fair shares based on updated demands
-      List<FSQueueSchedulable> queueScheds = this.getQueueSchedulables();
-      SchedulingAlgorithms.computeFairShares(
-          queueScheds, clusterCapacity);
-
-      // Update queue metrics for this queue
-      for (FSQueueSchedulable sched : queueScheds) {
-        sched.getMetrics().setAvailableResourcesToQueue(sched.getFairShare());
-      }
-
-      // Use the computed shares to assign shares within each queue
-      for (FSQueue queue: queueMgr.getQueues()) {
-        queue.getQueueSchedulable().redistributeShare();
-      }
-
-      // Update recorded capacity of root queue (child queues are updated
-      // when fair share is calculated).
-      rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
-    }
+   * Recompute the internal variables used by the scheduler - per-job weights,
+   * fair shares, deficits, minimum slot allocations, and amount of used and
+   * required resources per job.
+   */
+  protected synchronized void update() {
+    queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
+    updateRunnability(); // Set job runnability based on user/queue limits
+    updatePreemptionVariables(); // Determine if any queues merit preemption
+
+    FSQueue rootQueue = queueMgr.getRootQueue();
+
+    // Recursively update demands for all queues
+    rootQueue.updateDemand();
+
+    rootQueue.setFairShare(clusterCapacity);
+    // Recursively compute fair shares for all queues
+    // and update metrics
+    rootQueue.recomputeFairShares();
+
+    // Update recorded capacity of root queue (child queues are updated
+    // when fair share is calculated).
+    rootMetrics.setAvailableResourcesToQueue(clusterCapacity);
   }
 
   /**
@@ -223,7 +227,7 @@ public class FairScheduler implements Re
   private void updatePreemptionVariables() {
     long now = clock.getTime();
     lastPreemptionUpdateTime = now;
-    for (FSQueueSchedulable sched: getQueueSchedulables()) {
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       if (!isStarvedForMinShare(sched)) {
         sched.setLastTimeAtMinShare(now);
       }
@@ -236,16 +240,16 @@ public class FairScheduler implements Re
   /**
    * Is a queue below its min share for the given task type?
    */
-  boolean isStarvedForMinShare(FSQueueSchedulable sched) {
+  boolean isStarvedForMinShare(FSLeafQueue sched) {
     Resource desiredShare = Resources.min(sched.getMinShare(), sched.getDemand());
     return Resources.lessThan(sched.getResourceUsage(), desiredShare);
   }
 
   /**
-   * Is a queue being starved for fair share for the given task type?
-   * This is defined as being below half its fair share.
+   * Is a queue being starved for fair share for the given task type? This is
+   * defined as being below half its fair share.
    */
-  boolean isStarvedForFairShare(FSQueueSchedulable sched) {
+  boolean isStarvedForFairShare(FSLeafQueue sched) {
     Resource desiredFairShare = Resources.max(
         Resources.multiply(sched.getFairShare(), .5), sched.getDemand());
     return Resources.lessThan(sched.getResourceUsage(), desiredFairShare);
@@ -253,53 +257,55 @@ public class FairScheduler implements Re
 
   /**
    * Check for queues that need tasks preempted, either because they have been
-   * below their guaranteed share for minSharePreemptionTimeout or they
-   * have been below half their fair share for the fairSharePreemptionTimeout.
-   * If such queues exist, compute how many tasks of each type need to be
-   * preempted and then select the right ones using preemptTasks.
-   *
-   * This method computes and logs the number of tasks we want to preempt even
-   * if preemption is disabled, for debugging purposes.
+   * below their guaranteed share for minSharePreemptionTimeout or they have
+   * been below half their fair share for the fairSharePreemptionTimeout. If
+   * such queues exist, compute how many tasks of each type need to be preempted
+   * and then select the right ones using preemptTasks.
    */
-  protected void preemptTasksIfNecessary() {
-    if (!preemptionEnabled)
+  protected synchronized void preemptTasksIfNecessary() {
+    if (!preemptionEnabled) {
       return;
+    }
 
     long curTime = clock.getTime();
-    if (curTime - lastPreemptCheckTime < preemptionInterval)
+    if (curTime - lastPreemptCheckTime < preemptionInterval) {
       return;
+    }
     lastPreemptCheckTime = curTime;
 
     Resource resToPreempt = Resources.none();
 
-    for (FSQueueSchedulable sched: getQueueSchedulables()) {
+    for (FSLeafQueue sched : queueMgr.getLeafQueues()) {
       resToPreempt = Resources.add(resToPreempt, resToPreempt(sched, curTime));
     }
     if (Resources.greaterThan(resToPreempt, Resources.none())) {
-      preemptResources(getQueueSchedulables(), resToPreempt);
+      preemptResources(queueMgr.getLeafQueues(), resToPreempt);
     }
   }
 
   /**
-   * Preempt a quantity of resources from a list of QueueSchedulables.
-   * The policy for this is to pick apps from queues that are over their fair
-   * share, but make sure that no queue is placed below its fair share in the
-   * process. We further prioritize preemption by choosing containers with
-   * lowest priority to preempt.
+   * Preempt a quantity of resources from a list of QueueSchedulables. The
+   * policy for this is to pick apps from queues that are over their fair share,
+   * but make sure that no queue is placed below its fair share in the process.
+   * We further prioritize preemption by choosing containers with lowest
+   * priority to preempt.
    */
-  protected void preemptResources(List<FSQueueSchedulable> scheds, Resource toPreempt) {
-    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none()))
+  protected void preemptResources(Collection<FSLeafQueue> scheds,
+      Resource toPreempt) {
+    if (scheds.isEmpty() || Resources.equals(toPreempt, Resources.none())) {
       return;
+    }
 
     Map<RMContainer, FSSchedulerApp> apps = 
         new HashMap<RMContainer, FSSchedulerApp>();
-    Map<RMContainer, FSQueueSchedulable> queues = new HashMap<RMContainer, FSQueueSchedulable>();
+    Map<RMContainer, FSLeafQueue> queues = 
+        new HashMap<RMContainer, FSLeafQueue>();
 
     // Collect running containers from over-scheduled queues
     List<RMContainer> runningContainers = new ArrayList<RMContainer>();
-    for (FSQueueSchedulable sched: scheds) {
+    for (FSLeafQueue sched : scheds) {
       if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
-        for (AppSchedulable as: sched.getAppSchedulables()) {
+        for (AppSchedulable as : sched.getAppSchedulables()) {
           for (RMContainer c : as.getApp().getLiveContainers()) {
             runningContainers.add(c);
             apps.put(c, as.getApp());
@@ -319,18 +325,18 @@ public class FairScheduler implements Re
 
     // Scan down the sorted list of task statuses until we've killed enough
     // tasks, making sure we don't kill too many from any queue
-    for (RMContainer container: runningContainers) {
-     FSQueueSchedulable sched = queues.get(container);
+    for (RMContainer container : runningContainers) {
+      FSLeafQueue sched = queues.get(container);
       if (Resources.greaterThan(sched.getResourceUsage(), sched.getFairShare())) {
         LOG.info("Preempting container (prio=" + container.getContainer().getPriority() +
             "res=" + container.getContainer().getResource() +
-            ") from queue " + sched.getQueue().getName());
+            ") from queue " + sched.getName());
         ContainerStatus status = SchedulerUtils.createAbnormalContainerStatus(
             container.getContainerId(), SchedulerUtils.PREEMPTED_CONTAINER);
 
         // TODO: Not sure if this ever actually adds this to the list of cleanup
         // containers on the RMNode (see SchedulerNode.releaseContainer()).
-        this.completedContainer(container, status, RMContainerEventType.KILL);
+        completedContainer(container, status, RMContainerEventType.KILL);
 
         toPreempt = Resources.subtract(toPreempt,
             container.getContainer().getResource());
@@ -346,12 +352,12 @@ public class FairScheduler implements Re
    * If the queue has been below its min share for at least its preemption
    * timeout, it should preempt the difference between its current share and
    * this min share. If it has been below half its fair share for at least the
-   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
-   * its full fair share. If both conditions hold, we preempt the max of the
-   * two amounts (this shouldn't happen unless someone sets the timeouts to
-   * be identical for some reason).
+   * fairSharePreemptionTimeout, it should preempt enough tasks to get up to its
+   * full fair share. If both conditions hold, we preempt the max of the two
+   * amounts (this shouldn't happen unless someone sets the timeouts to be
+   * identical for some reason).
    */
-  protected Resource resToPreempt(FSQueueSchedulable sched, long curTime) {
+  protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
     String queue = sched.getName();
     long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
     long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
@@ -360,7 +366,7 @@ public class FairScheduler implements Re
     if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
       Resource target = Resources.min(sched.getMinShare(), sched.getDemand());
       resDueToMinShare = Resources.max(Resources.none(),
-                            Resources.subtract(target, sched.getResourceUsage()));
+          Resources.subtract(target, sched.getResourceUsage()));
     }
     if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
       Resource target = Resources.min(sched.getFairShare(), sched.getDemand());
@@ -378,15 +384,15 @@ public class FairScheduler implements Re
   }
 
   /**
-   * This updates the runnability of all apps based on whether or not
-   * any users/queues have exceeded their capacity.
+   * This updates the runnability of all apps based on whether or not any
+   * users/queues have exceeded their capacity.
    */
   private void updateRunnability() {
     List<AppSchedulable> apps = new ArrayList<AppSchedulable>();
 
     // Start by marking everything as not runnable
-    for (FSQueue p: queueMgr.getQueues()) {
-      for (AppSchedulable a: p.getQueueSchedulable().getAppSchedulables()) {
+    for (FSLeafQueue leafQueue : queueMgr.getLeafQueues()) {
+      for (AppSchedulable a : leafQueue.getAppSchedulables()) {
         a.setRunnable(false);
         apps.add(a);
       }
@@ -398,7 +404,7 @@ public class FairScheduler implements Re
     Map<String, Integer> userApps = new HashMap<String, Integer>();
     Map<String, Integer> queueApps = new HashMap<String, Integer>();
 
-    for (AppSchedulable app: apps) {
+    for (AppSchedulable app : apps) {
       String user = app.getApp().getUser();
       String queue = app.getApp().getQueueName();
       int userCount = userApps.containsKey(user) ? userApps.get(user) : 0;
@@ -413,10 +419,11 @@ public class FairScheduler implements Re
   }
 
   public RMContainerTokenSecretManager getContainerTokenSecretManager() {
-    return this.rmContext.getContainerTokenSecretManager();
+    return rmContext.getContainerTokenSecretManager();
   }
 
-  public double getAppWeight(AppSchedulable app) {
+  // synchronized for sizeBasedWeight
+  public synchronized double getAppWeight(AppSchedulable app) {
     if (!app.getRunnable()) {
       // Job won't launch tasks, but don't return 0 to avoid division errors
       return 1.0;
@@ -437,28 +444,28 @@ public class FairScheduler implements Re
 
   @Override
   public Resource getMinimumResourceCapability() {
-    return this.minimumAllocation;
+    return minimumAllocation;
   }
 
   @Override
   public Resource getMaximumResourceCapability() {
-    return this.maximumAllocation;
+    return maximumAllocation;
   }
 
   public double getNodeLocalityThreshold() {
-    return this.nodeLocalityThreshold;
+    return nodeLocalityThreshold;
   }
 
   public double getRackLocalityThreshold() {
-    return this.rackLocalityThreshold;
+    return rackLocalityThreshold;
   }
 
   public Resource getClusterCapacity() {
-    return this.clusterCapacity;
+    return clusterCapacity;
   }
 
   public Clock getClock() {
-    return this.clock;
+    return clock;
   }
 
   protected void setClock(Clock clock) {
@@ -470,22 +477,25 @@ public class FairScheduler implements Re
   }
 
   /**
-   * Add a new application to the scheduler, with a given id, queue name,
-   * and user. This will accept a new app even if the user or queue is above
+   * Add a new application to the scheduler, with a given id, queue name, and
+   * user. This will accept a new app even if the user or queue is above
    * configured limits, but the app will not be marked as runnable.
    */
-  protected synchronized void
-  addApplication(ApplicationAttemptId applicationAttemptId,
-      String queueName, String user) {
+  protected synchronized void addApplication(
+      ApplicationAttemptId applicationAttemptId, String queueName, String user) {
 
-    FSQueue queue = this.queueMgr.getQueue(queueName);
+    FSLeafQueue queue = queueMgr.getLeafQueue(queueName);
+    if (queue == null) {
+      // queue is not an existing or createable leaf queue
+      queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    }
 
     FSSchedulerApp schedulerApp =
         new FSSchedulerApp(applicationAttemptId, user,
-            queue.getQueueSchedulable(), new ActiveUsersManager(this.getRootQueueMetrics()),
-            rmContext, null);
-
-    // Inforce ACLs
+            queue, new ActiveUsersManager(getRootQueueMetrics()),
+            rmContext);
+    
+    // Enforce ACLs
     UserGroupInformation userUgi;
     try {
       userUgi = UserGroupInformation.getCurrentUser();
@@ -494,8 +504,8 @@ public class FairScheduler implements Re
       return;
     }
 
-    List<QueueUserACLInfo> info = queue.getQueueSchedulable().getQueueUserAclInfo(
-        userUgi); // Always a signleton list
+    // Always a singleton list
+    List<QueueUserACLInfo> info = queue.getQueueUserAclInfo(userUgi);
     if (!info.get(0).getUserAcls().contains(QueueACL.SUBMIT_APPLICATIONS)) {
       LOG.info("User " + userUgi.getUserName() +
           " cannot submit" + " applications to queue " + queue.getName());
@@ -503,14 +513,13 @@ public class FairScheduler implements Re
     }
 
     queue.addApp(schedulerApp);
-    queue.getQueueSchedulable().getMetrics().submitApp(user,
-    		applicationAttemptId.getAttemptId());
+    queue.getMetrics().submitApp(user, applicationAttemptId.getAttemptId());
     rootMetrics.submitApp(user, applicationAttemptId.getAttemptId());
 
     applications.put(applicationAttemptId, schedulerApp);
 
     LOG.info("Application Submission: " + applicationAttemptId +
-        ", user: " + user +
+        ", user: "+ user +
         ", currently active: " + applications.size());
 
     rmContext.getDispatcher().getEventHandler().handle(
@@ -537,10 +546,10 @@ public class FairScheduler implements Re
           SchedulerUtils.createAbnormalContainerStatus(
               rmContainer.getContainerId(),
               SchedulerUtils.COMPLETED_APPLICATION),
-          RMContainerEventType.KILL);
+              RMContainerEventType.KILL);
     }
 
-     // Release all reserved containers
+    // Release all reserved containers
     for (RMContainer rmContainer : application.getReservedContainers()) {
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
@@ -553,8 +562,9 @@ public class FairScheduler implements Re
     application.stop(rmAppAttemptFinalState);
 
     // Inform the queue
-    FSQueue queue = this.queueMgr.getQueue(application.getQueue().getQueueName());
-    queue.removeJob(application);
+    FSLeafQueue queue = queueMgr.getLeafQueue(application.getQueue()
+        .getQueueName());
+    queue.removeApp(application);
 
     // Remove from our data-structure
     applications.remove(applicationAttemptId);
@@ -600,7 +610,7 @@ public class FairScheduler implements Re
   }
 
   private synchronized void addNode(RMNode node) {
-    this.nodes.put(node.getNodeID(), new FSSchedulerNode(node));
+    nodes.put(node.getNodeID(), new FSSchedulerNode(node));
     Resources.addTo(clusterCapacity, node.getTotalCapability());
 
     LOG.info("Added node " + node.getNodeAddress() +
@@ -608,7 +618,7 @@ public class FairScheduler implements Re
   }
 
   private synchronized void removeNode(RMNode rmNode) {
-    FSSchedulerNode node = this.nodes.get(rmNode.getNodeID());
+    FSSchedulerNode node = nodes.get(rmNode.getNodeID());
     Resources.subtractFrom(clusterCapacity, rmNode.getTotalCapability());
 
     // Remove running containers
@@ -631,7 +641,7 @@ public class FairScheduler implements Re
           RMContainerEventType.KILL);
     }
 
-    this.nodes.remove(rmNode.getNodeID());
+    nodes.remove(rmNode.getNodeID());
     LOG.info("Removed node " + rmNode.getNodeAddress() +
         " cluster capacity: " + clusterCapacity);
   }
@@ -655,11 +665,11 @@ public class FairScheduler implements Re
     for (ContainerId releasedContainerId : release) {
       RMContainer rmContainer = getRMContainer(releasedContainerId);
       if (rmContainer == null) {
-         RMAuditLogger.logFailure(application.getUser(),
-             AuditConstants.RELEASE_CONTAINER,
-             "Unauthorized access or invalid container", "FairScheduler",
-             "Trying to release container not owned by app or with invalid id",
-             application.getApplicationId(), releasedContainerId);
+        RMAuditLogger.logFailure(application.getUser(),
+            AuditConstants.RELEASE_CONTAINER,
+            "Unauthorized access or invalid container", "FairScheduler",
+            "Trying to release container not owned by app or with invalid id",
+            application.getApplicationId(), releasedContainerId);
       }
       completedContainer(rmContainer,
           SchedulerUtils.createAbnormalContainerStatus(
@@ -669,13 +679,11 @@ public class FairScheduler implements Re
     }
 
     synchronized (application) {
-
       if (!ask.isEmpty()) {
-
-        if(LOG.isDebugEnabled()) {
+        if (LOG.isDebugEnabled()) {
           LOG.debug("allocate: pre-update" +
-            " applicationAttemptId=" + appAttemptId +
-            " application=" + application.getApplicationId());
+              " applicationAttemptId=" + appAttemptId +
+              " application=" + application.getApplicationId());
         }
         application.showRequests();
 
@@ -686,21 +694,19 @@ public class FairScheduler implements Re
         application.showRequests();
       }
 
-      if(LOG.isDebugEnabled()) {
+      if (LOG.isDebugEnabled()) {
         LOG.debug("allocate:" +
-          " applicationAttemptId=" + appAttemptId +
-          " #ask=" + ask.size());
+            " applicationAttemptId=" + appAttemptId +
+            " #ask=" + ask.size());
       }
 
-      return new Allocation(
-          application.pullNewlyAllocatedContainers(),
+      return new Allocation(application.pullNewlyAllocatedContainers(),
           application.getHeadroom());
     }
   }
 
   /**
-   * Process a container which has launched on a node, as reported by the
-   * node.
+   * Process a container which has launched on a node, as reported by the node.
    */
   private void containerLaunchedOnNode(ContainerId containerId, FSSchedulerNode node) {
     // Get the application for the finished container
@@ -722,7 +728,9 @@ public class FairScheduler implements Re
   private synchronized void nodeUpdate(RMNode nm,
       List<ContainerStatus> newlyLaunchedContainers,
       List<ContainerStatus> completedContainers) {
-    LOG.info("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("nodeUpdate: " + nm + " cluster capacity: " + clusterCapacity);
+    }
     eventLog.log("HEARTBEAT", nm.getHostName());
     FSSchedulerNode node = nodes.get(nm.getNodeID());
 
@@ -754,20 +762,20 @@ public class FairScheduler implements Re
       LOG.info("Trying to fulfill reservation for application " +
           reservedApplication.getApplicationId() + " on node: " + nm);
 
-      FSQueue queue = queueMgr.getQueue(reservedApplication.getQueueName());
-      queue.getQueueSchedulable().assignContainer(node, true);
+      FSLeafQueue queue = queueMgr.getLeafQueue(reservedApplication.getQueueName());
+      queue.assignContainer(node, true);
     }
 
-
     // Otherwise, schedule at queue which is furthest below fair share
     else {
       int assignedContainers = 0;
       while (true) {
         // At most one task is scheduled each iteration of this loop
-        List<FSQueueSchedulable> scheds = this.getQueueSchedulables();
+        List<FSLeafQueue> scheds = new ArrayList<FSLeafQueue>(
+            queueMgr.getLeafQueues());
         Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
         boolean assignedContainer = false;
-        for (FSQueueSchedulable sched : scheds) {
+        for (FSLeafQueue sched : scheds) {
           Resource assigned = sched.assignContainer(node, false);
           if (Resources.greaterThan(assigned, Resources.none())) {
             eventLog.log("ASSIGN", nm.getHostName(), assigned);
@@ -796,11 +804,11 @@ public class FairScheduler implements Re
   @Override
   public SchedulerAppReport getSchedulerAppInfo(
       ApplicationAttemptId appAttemptId) {
-    if (!this.applications.containsKey(appAttemptId)) {
+    if (!applications.containsKey(appAttemptId)) {
       LOG.error("Request for appInfo of unknown attempt" + appAttemptId);
       return null;
     }
-    return new SchedulerAppReport(this.applications.get(appAttemptId));
+    return new SchedulerAppReport(applications.get(appAttemptId));
   }
 
   @Override
@@ -810,43 +818,35 @@ public class FairScheduler implements Re
 
   @Override
   public void handle(SchedulerEvent event) {
-    switch(event.getType()) {
+    switch (event.getType()) {
     case NODE_ADDED:
-    {
       if (!(event instanceof NodeAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       NodeAddedSchedulerEvent nodeAddedEvent = (NodeAddedSchedulerEvent)event;
       addNode(nodeAddedEvent.getAddedRMNode());
-    }
-    break;
+      break;
     case NODE_REMOVED:
-    {
       if (!(event instanceof NodeRemovedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       NodeRemovedSchedulerEvent nodeRemovedEvent = (NodeRemovedSchedulerEvent)event;
       removeNode(nodeRemovedEvent.getRemovedRMNode());
-    }
-    break;
+      break;
     case NODE_UPDATE:
-    {
       if (!(event instanceof NodeUpdateSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
-      NodeUpdateSchedulerEvent nodeUpdatedEvent =
-      (NodeUpdateSchedulerEvent)event;
-      this.nodeUpdate(nodeUpdatedEvent.getRMNode(),
+      NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event;
+      nodeUpdate(nodeUpdatedEvent.getRMNode(),
           nodeUpdatedEvent.getNewlyLaunchedContainers(),
           nodeUpdatedEvent.getCompletedContainers());
-    }
-    break;
+      break;
     case APP_ADDED:
-    {
       if (!(event instanceof AppAddedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
-      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
+      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent)event;
       String queue = appAddedEvent.getQueue();
 
       // Potentially set queue to username if configured to do so
@@ -857,33 +857,28 @@ public class FairScheduler implements Re
 
       addApplication(appAddedEvent.getApplicationAttemptId(), queue,
           appAddedEvent.getUser());
-    }
-    break;
+      break;
     case APP_REMOVED:
-    {
       if (!(event instanceof AppRemovedSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       AppRemovedSchedulerEvent appRemovedEvent = (AppRemovedSchedulerEvent)event;
-      this.removeApplication(appRemovedEvent.getApplicationAttemptID(),
+      removeApplication(appRemovedEvent.getApplicationAttemptID(),
           appRemovedEvent.getFinalAttemptState());
-    }
-    break;
+      break;
     case CONTAINER_EXPIRED:
-    {
       if (!(event instanceof ContainerExpiredSchedulerEvent)) {
         throw new RuntimeException("Unexpected event type: " + event);
       }
       ContainerExpiredSchedulerEvent containerExpiredEvent =
-          (ContainerExpiredSchedulerEvent) event;
+          (ContainerExpiredSchedulerEvent)event;
       ContainerId containerId = containerExpiredEvent.getContainerId();
       completedContainer(getRMContainer(containerId),
           SchedulerUtils.createAbnormalContainerStatus(
               containerId,
               SchedulerUtils.EXPIRED_CONTAINER),
           RMContainerEventType.EXPIRE);
-    }
-    break;
+      break;
     default:
       LOG.error("Unknown event arrived at FairScheduler: " + event.toString());
     }
@@ -895,13 +890,12 @@ public class FairScheduler implements Re
   }
 
   @Override
-  public synchronized void
-      reinitialize(Configuration conf, RMContext rmContext) throws IOException {
-    if (!this.initialized) {
+  public synchronized void reinitialize(Configuration conf, RMContext rmContext)
+      throws IOException {
+    if (!initialized) {
       this.conf = new FairSchedulerConfiguration(conf);
-      this.rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
+      rootMetrics = QueueMetrics.forQueue("root", null, true, conf);
       this.rmContext = rmContext;
-      this.clock = new SystemClock();
       this.eventLog = new FairSchedulerEventLog();
       eventLog.init(this.conf);
       minimumAllocation = this.conf.getMinimumMemoryAllocation();
@@ -913,21 +907,20 @@ public class FairScheduler implements Re
       assignMultiple = this.conf.getAssignMultiple();
       maxAssign = this.conf.getMaxAssign();
 
-      Thread updateThread = new Thread(new UpdateThread());
-      updateThread.start();
-
       initialized = true;
 
       sizeBasedWeight = this.conf.getSizeBasedWeight();
 
-      queueMgr = new QueueManager(this);
-
       try {
         queueMgr.initialize();
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new IOException("Failed to start FairScheduler", e);
       }
+
+      Thread updateThread = new Thread(new UpdateThread());
+      updateThread.setName("FairSchedulerUpdateThread");
+      updateThread.setDaemon(true);
+      updateThread.start();
     } else {
       this.conf = new FairSchedulerConfiguration(conf);
       userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
@@ -935,10 +928,9 @@ public class FairScheduler implements Re
       rackLocalityThreshold = this.conf.getLocalityThresholdRack();
       preemptionEnabled = this.conf.getPreemptionEnabled();
       try {
-       queueMgr.reloadAllocs();
+        queueMgr.reloadAllocs();
 
-      }
-      catch (Exception e) {
+      } catch (Exception e) {
         throw new IOException("Failed to initialize FairScheduler", e);
       }
     }
@@ -950,8 +942,8 @@ public class FairScheduler implements Re
     if (!queueMgr.exists(queueName)) {
       return null;
     }
-    return queueMgr.getQueue(queueName).getQueueSchedulable().getQueueInfo(
-        includeChildQueues, recursive);
+    return queueMgr.getQueue(queueName).getQueueInfo(includeChildQueues,
+        recursive);
   }
 
   @Override
@@ -963,17 +955,12 @@ public class FairScheduler implements Re
       return new ArrayList<QueueUserACLInfo>();
     }
 
-    List<QueueUserACLInfo> userAcls = new ArrayList<QueueUserACLInfo>();
-
-    for (FSQueue queue : queueMgr.getQueues()) {
-      userAcls.addAll(queue.getQueueSchedulable().getQueueUserAclInfo(user));
-    }
-    return userAcls;
+    return queueMgr.getRootQueue().getQueueUserAclInfo(user);
   }
 
   @Override
   public int getNumClusterNodes() {
-    return this.nodes.size();
+    return nodes.size();
   }
 
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Nov 30 19:58:09 2012
@@ -18,7 +18,7 @@ public class FairSchedulerConfiguration 
   /** Whether to use the user name as the queue name (instead of "default") if
    * the request does not specify a queue. */
   protected static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
-  protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
+  protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
 
   protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
   protected static final float  DEFAULT_LOCALITY_THRESHOLD = -1.0f;

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Fri Nov 30 19:58:09 2012
@@ -77,12 +77,11 @@ class FairSchedulerEventLog {
   boolean init(FairSchedulerConfiguration conf) {
     try {
       logDir = conf.getEventlogDir();
-      Path logDirPath = new Path(logDir);
-      FileSystem fs = logDirPath.getFileSystem(conf);
-      if (!fs.exists(logDirPath)) {
-        if (!fs.mkdirs(logDirPath)) {
+      File logDirFile = new File(logDir);
+      if (!logDirFile.exists()) {
+        if (!logDirFile.mkdirs()) {
           throw new IOException(
-              "Mkdirs failed to create " + logDirPath.toString());
+              "Mkdirs failed to create " + logDirFile.toString());
         }
       }
       String username = System.getProperty("user.name");
@@ -142,4 +141,8 @@ class FairSchedulerEventLog {
   synchronized boolean isEnabled() {
     return !logDisabled;
   }
+  
+  public String getLogFile() {
+    return logFile;
+  }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Nov 30 19:58:09 2012
@@ -24,10 +24,10 @@ import java.net.URL;
 import java.net.URLConnection;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -53,6 +53,7 @@ import org.xml.sax.SAXException;
 /**
  * Maintains a list of queues as well as scheduling parameters for each queue,
  * such as guaranteed share allocations, from the fair scheduler config file.
+ * 
  */
 @Private
 @Unstable
@@ -60,6 +61,8 @@ public class QueueManager {
   public static final Log LOG = LogFactory.getLog(
     QueueManager.class.getName());
 
+  public static final String ROOT_QUEUE = "root";
+  
   /** Time to wait between checks of the allocation file */
   public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
 
@@ -71,89 +74,37 @@ public class QueueManager {
 
   private final FairScheduler scheduler;
 
-  // Minimum resource allocation for each queue
-  private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
-  // Maximum amount of resources per queue
-  private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
-  // Sharing weights for each queue
-  private Map<String, Double> queueWeights = new HashMap<String, Double>();
-
-  // Max concurrent running applications for each queue and for each user; in addition,
-  // for users that have no max specified, we use the userMaxJobsDefault.
-  private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
-  private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
-  private int userMaxAppsDefault = Integer.MAX_VALUE;
-  private int queueMaxAppsDefault = Integer.MAX_VALUE;
-
-  // ACL's for each queue. Only specifies non-default ACL's from configuration.
-  private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
-      new HashMap<String, Map<QueueACL, AccessControlList>>();
-
-  // Min share preemption timeout for each queue in seconds. If a job in the queue
-  // waits this long without receiving its guaranteed share, it is allowed to
-  // preempt other jobs' tasks.
-  private Map<String, Long> minSharePreemptionTimeouts =
-    new HashMap<String, Long>();
-
-  // Default min share preemption timeout for queues where it is not set
-  // explicitly.
-  private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
-
-  // Preemption timeout for jobs below fair share in seconds. If a job remains
-  // below half its fair share for this long, it is allowed to preempt tasks.
-  private long fairSharePreemptionTimeout = Long.MAX_VALUE;
-
-  SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
-
   private Object allocFile; // Path to XML file containing allocations. This
                             // is either a URL to specify a classpath resource
                             // (if the fair-scheduler.xml on the classpath is
                             // used) or a String to specify an absolute path (if
                             // mapred.fairscheduler.allocation.file is used).
 
-  private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+  private final Collection<FSLeafQueue> leafQueues = 
+      new CopyOnWriteArrayList<FSLeafQueue>();
+  private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+  private FSParentQueue rootQueue;
 
+  private volatile QueueManagerInfo info = new QueueManagerInfo();
+  
   private long lastReloadAttempt; // Last time we tried to reload the queues file
   private long lastSuccessfulReload; // Last time we successfully reloaded queues
   private boolean lastReloadAttemptFailed = false;
   
-  // Monitor object for minQueueResources
-  private Object minQueueResourcesMO = new Object();
-  
-  //Monitor object for maxQueueResources
-  private Object maxQueueResourcesMO = new Object();
-  
-  //Monitor object for queueMaxApps
-  private Object queueMaxAppsMO = new Object();
-  
-  //Monitor object for userMaxApps
-  private Object userMaxAppsMO = new Object();
-  
-  //Monitor object for queueWeights
-  private Object queueWeightsMO = new Object();
-  
-  //Monitor object for minSharePreemptionTimeouts
-  private Object minSharePreemptionTimeoutsMO = new Object();
-  
-  //Monitor object for queueAcls
-  private Object queueAclsMO = new Object();
-  
-  //Monitor object for userMaxAppsDefault
-  private Object userMaxAppsDefaultMO = new Object();
-  
-  //Monitor object for queueMaxAppsDefault
-  private Object queueMaxAppsDefaultMO = new Object();
-  
-  //Monitor object for defaultSchedulingMode
-  private Object defaultSchedulingModeMO = new Object();
-  
   public QueueManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
   }
+  
+  public FSParentQueue getRootQueue() {
+    return rootQueue;
+  }
 
   public void initialize() throws IOException, SAXException,
       AllocationConfigurationException, ParserConfigurationException {
     FairSchedulerConfiguration conf = scheduler.getConf();
+    rootQueue = new FSParentQueue("root", this, scheduler, null);
+    queues.put(rootQueue.getName(), rootQueue);
+    
     this.allocFile = conf.getAllocationFile();
     if (allocFile == null) {
       // No allocation file specified in jobconf. Use the default allocation
@@ -169,23 +120,106 @@ public class QueueManager {
     lastSuccessfulReload = scheduler.getClock().getTime();
     lastReloadAttempt = scheduler.getClock().getTime();
     // Create the default queue
-    getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+    getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
   }
-
+  
   /**
-   * Get a queue by name, creating it if necessary
-   */
-  public FSQueue getQueue(String name) {
+   * Get a queue by name, creating it if necessary.  If the queue
+   * is not or can not be a leaf queue, i.e. it already exists as a parent queue,
+   * or one of the parents in its name is already a leaf queue, null is returned.
+   * 
+   * The root part of the name is optional, so a queue underneath the root 
+   * named "queue1" could be referred to  as just "queue1", and a queue named
+   * "queue2" underneath a parent named "parent1" that is underneath the root 
+   * could be referred to as just "parent1.queue2".
+   */
+  public FSLeafQueue getLeafQueue(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".")) {
+      name = ROOT_QUEUE + "." + name;
+    }
     synchronized (queues) {
       FSQueue queue = queues.get(name);
       if (queue == null) {
-        queue = new FSQueue(scheduler, name);
-        synchronized (defaultSchedulingModeMO){
-          queue.setSchedulingMode(defaultSchedulingMode);
+        FSLeafQueue leafQueue = createLeafQueue(name);
+        if (leafQueue == null) {
+          return null;
         }
-        queues.put(name, queue);
+        leafQueue.setSchedulingMode(info.defaultSchedulingMode);
+        queue = leafQueue;
+      } else if (queue instanceof FSParentQueue) {
+        return null;
+      }
+      return (FSLeafQueue)queue;
+    }
+  }
+  
+  /**
+   * Creates a leaf queue and places it in the tree. Creates any
+   * parents that don't already exist.
+   * 
+   * @return
+   *    the created queue, if successful. null if not allowed (one of the parent
+   *    queues in the queue name is already a leaf queue)
+   */
+  private FSLeafQueue createLeafQueue(String name) {
+    List<String> newQueueNames = new ArrayList<String>();
+    newQueueNames.add(name);
+    int sepIndex = name.length();
+    FSParentQueue parent = null;
+
+    // Move up the queue tree until we reach one that exists.
+    while (sepIndex != -1) {
+      sepIndex = name.lastIndexOf('.', sepIndex-1);
+      FSQueue queue;
+      String curName = null;
+      curName = name.substring(0, sepIndex);
+      queue = queues.get(curName);
+
+      if (queue == null) {
+        newQueueNames.add(curName);
+      } else {
+        if (queue instanceof FSParentQueue) {
+          parent = (FSParentQueue)queue;
+          break;
+        } else {
+          return null;
+        }
+      }
+    }
+    
+    // At this point, parent refers to the deepest existing parent of the
+    // queue to create.
+    // Now that we know everything worked out, make all the queues
+    // and add them to the map.
+    FSLeafQueue leafQueue = null;
+    for (int i = newQueueNames.size()-1; i >= 0; i--) {
+      String queueName = newQueueNames.get(i);
+      if (i == 0) {
+        // First name added was the leaf queue
+        leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+        parent.addChildQueue(leafQueue);
+        queues.put(leafQueue.getName(), leafQueue);
+        leafQueues.add(leafQueue);
+      } else {
+        FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+        parent.addChildQueue(newParent);
+        queues.put(newParent.getName(), newParent);
+        parent = newParent;
       }
-      return queue;
+    }
+    
+    return leafQueue;
+  }
+
+  /**
+   * Gets a queue by name.
+   */
+  public FSQueue getQueue(String name) {
+    if (!name.startsWith(ROOT_QUEUE + ".") && !name.equals(ROOT_QUEUE)) {
+      name = ROOT_QUEUE + "." + name;
+    }
+    synchronized (queues) {
+      return queues.get(name);
     }
   }
 
@@ -201,8 +235,8 @@ public class QueueManager {
   /**
    * Get the queue for a given AppSchedulable.
    */
-  public FSQueue getQueueForApp(AppSchedulable app) {
-    return this.getQueue(app.getApp().getQueueName());
+  public FSLeafQueue getQueueForApp(AppSchedulable app) {
+    return getLeafQueue(app.getApp().getQueueName());
   }
 
   /**
@@ -272,6 +306,8 @@ public class QueueManager {
         new HashMap<String, Map<QueueACL, AccessControlList>>();
     int userMaxAppsDefault = Integer.MAX_VALUE;
     int queueMaxAppsDefault = Integer.MAX_VALUE;
+    long fairSharePreemptionTimeout = Long.MAX_VALUE;
+    long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
     SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
 
     // Remember all queue names so we can display them on web UI, etc.
@@ -300,54 +336,9 @@ public class QueueManager {
       Element element = (Element)node;
       if ("queue".equals(element.getTagName()) ||
     	  "pool".equals(element.getTagName())) {
-        String queueName = element.getAttribute("name");
-        Map<QueueACL, AccessControlList> acls =
-            new HashMap<QueueACL, AccessControlList>();
-        queueNamesInAllocFile.add(queueName);
-        NodeList fields = element.getChildNodes();
-        for (int j = 0; j < fields.getLength(); j++) {
-          Node fieldNode = fields.item(j);
-          if (!(fieldNode instanceof Element))
-            continue;
-          Element field = (Element) fieldNode;
-          if ("minResources".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            minQueueResources.put(queueName, Resources.createResource(val));
-          } else if ("maxResources".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            maxQueueResources.put(queueName, Resources.createResource(val));
-          } else if ("maxRunningApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            int val = Integer.parseInt(text);
-            queueMaxApps.put(queueName, val);
-          } else if ("weight".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            double val = Double.parseDouble(text);
-            queueWeights.put(queueName, val);
-          } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            long val = Long.parseLong(text) * 1000L;
-            minSharePreemptionTimeouts.put(queueName, val);
-          } else if ("schedulingMode".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            queueModes.put(queueName, parseSchedulingMode(text));
-          } else if ("aclSubmitApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
-          } else if ("aclAdministerApps".equals(field.getTagName())) {
-            String text = ((Text)field.getFirstChild()).getData().trim();
-            acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
-          }
-        }
-        queueAcls.put(queueName, acls);
-        if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
-            && Resources.lessThan(maxQueueResources.get(queueName),
-                minQueueResources.get(queueName))) {
-          LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
-              queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
-        }
+        loadQueue("root", element, minQueueResources, maxQueueResources, queueMaxApps,
+            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+            queueAcls, queueNamesInAllocFile);
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
         NodeList fields = element.getChildNodes();
@@ -388,19 +379,13 @@ public class QueueManager {
 
     // Commit the reload; also create any queue defined in the alloc file
     // if it does not already exist, so it can be displayed on the web UI.
-    synchronized(this) {
-      setMinResources(minQueueResources);
-      setMaxResources(maxQueueResources);
-      setQueueMaxApps(queueMaxApps);
-      setUserMaxApps(userMaxApps);
-      setQueueWeights(queueWeights);
-      setUserMaxAppsDefault(userMaxAppsDefault);
-      setQueueMaxAppsDefault(queueMaxAppsDefault);
-      setDefaultSchedulingMode(defaultSchedulingMode);
-      setMinSharePreemptionTimeouts(minSharePreemptionTimeouts);
-      setQueueAcls(queueAcls);
+    synchronized (this) {
+      info = new QueueManagerInfo(minQueueResources, maxQueueResources,
+          queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
+          queueMaxAppsDefault, defaultSchedulingMode, minSharePreemptionTimeouts,
+          queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
       for (String name: queueNamesInAllocFile) {
-        FSQueue queue = getQueue(name);
+        FSLeafQueue queue = getLeafQueue(name);
         if (queueModes.containsKey(name)) {
           queue.setSchedulingMode(queueModes.get(name));
         } else {
@@ -409,6 +394,75 @@ public class QueueManager {
       }
     }
   }
+  
+  /**
+   * Loads a queue from a queue element in the configuration file
+   */
+  private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+      Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+      Map<String, Integer> userMaxApps, Map<String, Double> queueWeights,
+      Map<String, SchedulingMode> queueModes, Map<String, Long> minSharePreemptionTimeouts,
+      Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile) 
+      throws AllocationConfigurationException {
+    String queueName = parentName + "." + element.getAttribute("name");
+    Map<QueueACL, AccessControlList> acls =
+        new HashMap<QueueACL, AccessControlList>();
+    NodeList fields = element.getChildNodes();
+    boolean isLeaf = true;
+
+    for (int j = 0; j < fields.getLength(); j++) {
+      Node fieldNode = fields.item(j);
+      if (!(fieldNode instanceof Element))
+        continue;
+      Element field = (Element) fieldNode;
+      if ("minResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        minQueueResources.put(queueName, Resources.createResource(val));
+      } else if ("maxResources".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        maxQueueResources.put(queueName, Resources.createResource(val));
+      } else if ("maxRunningApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        queueMaxApps.put(queueName, val);
+      } else if ("weight".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        double val = Double.parseDouble(text);
+        queueWeights.put(queueName, val);
+      } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        minSharePreemptionTimeouts.put(queueName, val);
+      } else if ("schedulingMode".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        queueModes.put(queueName, parseSchedulingMode(text));
+      } else if ("aclSubmitApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+      } else if ("aclAdministerApps".equals(field.getTagName())) {
+        String text = ((Text)field.getFirstChild()).getData().trim();
+        acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+      } else if ("queue".endsWith(field.getTagName()) || 
+          "pool".equals(field.getTagName())) {
+        loadQueue(queueName, field, minQueueResources, maxQueueResources, queueMaxApps,
+            userMaxApps, queueWeights, queueModes, minSharePreemptionTimeouts,
+            queueAcls, queueNamesInAllocFile);
+        isLeaf = false;
+      }
+    }
+    if (isLeaf) {
+      queueNamesInAllocFile.add(queueName);
+    }
+    queueAcls.put(queueName, acls);
+    if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+        && Resources.lessThan(maxQueueResources.get(queueName),
+            minQueueResources.get(queueName))) {
+      LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+          queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+    }
+  }
 
   private SchedulingMode parseSchedulingMode(String text)
       throws AllocationConfigurationException {
@@ -428,181 +482,87 @@ public class QueueManager {
    * @return the cap set on this queue, or 0 if not set.
    */
   public Resource getMinResources(String queue) {
-    synchronized(minQueueResourcesMO) {
-      if (minQueueResources.containsKey(queue)) {
-        return minQueueResources.get(queue);
-      } else{
-        return Resources.createResource(0);
-      }
+    Resource minQueueResource = info.minQueueResources.get(queue);
+    if (minQueueResource != null) {
+      return minQueueResource;
+    } else {
+      return Resources.createResource(0);
     }
   }
 
-  private void setMinResources(Map<String, Resource> resources) {
-    synchronized(minQueueResourcesMO) {
-      minQueueResources = resources;
-    }
-  }
   /**
    * Get the maximum resource allocation for the given queue.
    * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
    */
-  public Resource getMaxResources(String queueName) {
-    synchronized (maxQueueResourcesMO) {
-      if (maxQueueResources.containsKey(queueName)) {
-        return maxQueueResources.get(queueName);
-      } else {
-        return Resources.createResource(Integer.MAX_VALUE);
-      }
-    }
-  }
 
-  private void setMaxResources(Map<String, Resource> resources) {
-    synchronized(maxQueueResourcesMO) {
-      maxQueueResources = resources;
+  public Resource getMaxResources(String queueName) {
+    Resource maxQueueResource = info.maxQueueResources.get(queueName);
+    if (maxQueueResource != null) {
+      return maxQueueResource;
+    } else {
+      return Resources.createResource(Integer.MAX_VALUE);
     }
   }
-  
-  /**
-   * Add an app in the appropriate queue
-   */
-  public synchronized void addApp(FSSchedulerApp app) {
-    getQueue(app.getQueueName()).addApp(app);
-  }
-
-  /**
-   * Remove an app
-   */
-  public synchronized void removeJob(FSSchedulerApp app) {
-    getQueue(app.getQueueName()).removeJob(app);
-  }
 
   /**
    * Get a collection of all queues
    */
-  public Collection<FSQueue> getQueues() {
+  public Collection<FSLeafQueue> getLeafQueues() {
     synchronized (queues) {
-      return queues.values();
+      return leafQueues;
     }
   }
 
-  /**
-   * Get all queue names that have been seen either in the allocation file or in
-   * a submitted app.
-   */
-  public synchronized Collection<String> getQueueNames() {
-    List<String> list = new ArrayList<String>();
-    for (FSQueue queue: getQueues()) {
-      list.add(queue.getName());
-    }
-    Collections.sort(list);
-    return list;
-  }
-
   public int getUserMaxApps(String user) {
-    synchronized (userMaxAppsMO) {
-      if (userMaxApps.containsKey(user)) {
-        return userMaxApps.get(user);
-      } else {
-        return getUserMaxAppsDefault();
-      }
+    // save current info in case it gets changed under us
+    QueueManagerInfo info = this.info;
+    if (info.userMaxApps.containsKey(user)) {
+      return info.userMaxApps.get(user);
+    } else {
+      return info.userMaxAppsDefault;
     }
   }
 
-  private void setUserMaxApps(Map<String, Integer> userApps) {
-    synchronized (userMaxAppsMO) {
-      userMaxApps = userApps;
-    }
-  }
-  
-  private int getUserMaxAppsDefault() {
-    synchronized (userMaxAppsDefaultMO){
-      return userMaxAppsDefault;
-    }
-  }
-  
-  private void setUserMaxAppsDefault(int userMaxApps) {
-    synchronized (userMaxAppsDefaultMO){
-      userMaxAppsDefault = userMaxApps;
-    }
-  }
-  
   public int getQueueMaxApps(String queue) {
-    synchronized (queueMaxAppsMO) {
-      if (queueMaxApps.containsKey(queue)) {
-        return queueMaxApps.get(queue);
-      } else {
-        return getQueueMaxAppsDefault();
-      }
-    }
-  }
-  
-  private void setQueueMaxApps(Map<String, Integer> queueApps) {
-    synchronized (queueMaxAppsMO) {
-      queueMaxApps = queueApps;
-    }
-  }
-  
-  private int getQueueMaxAppsDefault(){
-    synchronized(queueMaxAppsDefaultMO) {
-      return queueMaxAppsDefault;
-    }
-  }
-  
-  private void setQueueMaxAppsDefault(int queueMaxApps){
-    synchronized(queueMaxAppsDefaultMO) {
-      queueMaxAppsDefault = queueMaxApps;
+    // save current info in case it gets changed under us
+    QueueManagerInfo info = this.info;
+    if (info.queueMaxApps.containsKey(queue)) {
+      return info.queueMaxApps.get(queue);
+    } else {
+      return info.queueMaxAppsDefault;
     }
   }
   
-  private void setDefaultSchedulingMode(SchedulingMode schedulingMode){
-    synchronized(defaultSchedulingModeMO) {
-      defaultSchedulingMode = schedulingMode;
-    }
-  }
-
   public double getQueueWeight(String queue) {
-    synchronized (queueWeightsMO) {
-      if (queueWeights.containsKey(queue)) {
-        return queueWeights.get(queue);
-      } else {
-        return 1.0;
-      }
+    Double weight = info.queueWeights.get(queue);
+    if (weight != null) {
+      return weight;
+    } else {
+      return 1.0;
     }
   }
 
-  private void setQueueWeights(Map<String, Double> weights) {
-    synchronized (queueWeightsMO) {
-      queueWeights = weights;
-    }
-  }
   /**
-  * Get a queue's min share preemption timeout, in milliseconds. This is the
-  * time after which jobs in the queue may kill other queues' tasks if they
-  * are below their min share.
-  */
+   * Get a queue's min share preemption timeout, in milliseconds. This is the
+   * time after which jobs in the queue may kill other queues' tasks if they
+   * are below their min share.
+   */
   public long getMinSharePreemptionTimeout(String queueName) {
-    synchronized (minSharePreemptionTimeoutsMO) {
-      if (minSharePreemptionTimeouts.containsKey(queueName)) {
-        return minSharePreemptionTimeouts.get(queueName);
-      }
+    // save current info in case it gets changed under us
+    QueueManagerInfo info = this.info;
+    if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
+      return info.minSharePreemptionTimeouts.get(queueName);
     }
-    return defaultMinSharePreemptionTimeout;
+    return info.defaultMinSharePreemptionTimeout;
   }
   
-  private void setMinSharePreemptionTimeouts(
-      Map<String, Long> sharePreemptionTimeouts){
-    synchronized (minSharePreemptionTimeoutsMO) {
-      minSharePreemptionTimeouts = sharePreemptionTimeouts;
-    }
-  }
-
   /**
    * Get the fair share preemption, in milliseconds. This is the time
    * after which any job may kill other jobs' tasks if it is below half
    * its fair share.
    */
   public long getFairSharePreemptionTimeout() {
-    return fairSharePreemptionTimeout;
+    return info.fairSharePreemptionTimeout;
   }
 
   /**
@@ -611,10 +571,9 @@ public class QueueManager {
    */
   public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
     HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
-    synchronized (queueAclsMO) {
-      if (queueAcls.containsKey(queue)) {
-        out.putAll(queueAcls.get(queue));
-      }
+    Map<QueueACL, AccessControlList> queueAcl = info.queueAcls.get(queue);
+    if (queueAcl != null) {
+      out.putAll(queueAcl);
     }
     if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
       out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
@@ -625,9 +584,74 @@ public class QueueManager {
     return out;
   }
   
-  private void setQueueAcls(Map<String, Map<QueueACL, AccessControlList>> queue) {
-    synchronized (queueAclsMO) {
-      queueAcls = queue;
+  static class QueueManagerInfo {
+    // Minimum resource allocation for each queue
+    public final Map<String, Resource> minQueueResources;
+    // Maximum amount of resources per queue
+    public final Map<String, Resource> maxQueueResources;
+    // Sharing weights for each queue
+    public final Map<String, Double> queueWeights;
+    
+    // Max concurrent running applications for each queue and for each user; in addition,
+    // for users that have no max specified, we use the userMaxJobsDefault.
+    public final Map<String, Integer> queueMaxApps;
+    public final Map<String, Integer> userMaxApps;
+    public final int userMaxAppsDefault;
+    public final int queueMaxAppsDefault;
+
+    // ACL's for each queue. Only specifies non-default ACL's from configuration.
+    public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
+
+    // Min share preemption timeout for each queue in seconds. If a job in the queue
+    // waits this long without receiving its guaranteed share, it is allowed to
+    // preempt other jobs' tasks.
+    public final Map<String, Long> minSharePreemptionTimeouts;
+
+    // Default min share preemption timeout for queues where it is not set
+    // explicitly.
+    public final long defaultMinSharePreemptionTimeout;
+
+    // Preemption timeout for jobs below fair share in seconds. If a job remains
+    // below half its fair share for this long, it is allowed to preempt tasks.
+    public final long fairSharePreemptionTimeout;
+
+    public final SchedulingMode defaultSchedulingMode;
+    
+    public QueueManagerInfo(Map<String, Resource> minQueueResources, 
+        Map<String, Resource> maxQueueResources, 
+        Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
+        Map<String, Double> queueWeights, int userMaxAppsDefault,
+        int queueMaxAppsDefault, SchedulingMode defaultSchedulingMode, 
+        Map<String, Long> minSharePreemptionTimeouts, 
+        Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+        long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
+      this.minQueueResources = minQueueResources;
+      this.maxQueueResources = maxQueueResources;
+      this.queueMaxApps = queueMaxApps;
+      this.userMaxApps = userMaxApps;
+      this.queueWeights = queueWeights;
+      this.userMaxAppsDefault = userMaxAppsDefault;
+      this.queueMaxAppsDefault = queueMaxAppsDefault;
+      this.defaultSchedulingMode = defaultSchedulingMode;
+      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+      this.queueAcls = queueAcls;
+      this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+      this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+    }
+    
+    public QueueManagerInfo() {
+      minQueueResources = new HashMap<String, Resource>();
+      maxQueueResources = new HashMap<String, Resource>();
+      queueWeights = new HashMap<String, Double>();
+      queueMaxApps = new HashMap<String, Integer>();
+      userMaxApps = new HashMap<String, Integer>();
+      userMaxAppsDefault = Integer.MAX_VALUE;
+      queueMaxAppsDefault = Integer.MAX_VALUE;
+      queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
+      minSharePreemptionTimeouts = new HashMap<String, Long>();
+      defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+      fairSharePreemptionTimeout = Long.MAX_VALUE;
+      defaultSchedulingMode = SchedulingMode.FAIR;
     }
   }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Nov 30 19:58:09 2012
@@ -92,12 +92,6 @@ abstract class Schedulable {
   public abstract void updateDemand();
 
   /**
-   * Distribute the fair share assigned to this Schedulable among its
-   * children (used in queues where the internal scheduler is fair sharing).
-   */
-  public abstract void redistributeShare();
-
-  /**
    * Assign a container on this node if possible, and return the amount of
    * resources assigned. If {@code reserved} is true, it means a reservation
    * already exists on this node, and the schedulable should fulfill that

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Fri Nov 30 19:58:09 2012
@@ -24,7 +24,7 @@ import org.apache.hadoop.conf.Configurab
 
 /**
  * A pluggable object for altering the weights of apps in the fair scheduler,
- * which is used for example by {@link NewJobWeightBooster} to give higher
+ * which is used for example by {@link NewAppWeightBooster} to give higher
  * weight to new jobs so that short jobs finish faster.
  *
  * May implement {@link Configurable} to access configuration parameters.

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fifo/FifoScheduler.java Fri Nov 30 19:58:09 2012
@@ -56,7 +56,7 @@ import org.apache.hadoop.yarn.factory.pr
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
 import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
 import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
-import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
+import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
 import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEvent;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptEventType;
@@ -292,7 +292,7 @@ public class FifoScheduler implements Re
     // TODO: Fix store
     FiCaSchedulerApp schedulerApp = 
         new FiCaSchedulerApp(appAttemptId, user, DEFAULT_QUEUE, activeUsersManager,
-            this.rmContext, null);
+            this.rmContext);
     applications.put(appAttemptId, schedulerApp);
     metrics.submitApp(user, appAttemptId.getAttemptId());
     LOG.info("Application Submission: " + appAttemptId.getApplicationId() + 
@@ -763,13 +763,7 @@ public class FifoScheduler implements Re
 
   @Override
   public void recover(RMState state) {
-    // TODO fix recovery
-//    for (Map.Entry<ApplicationId, ApplicationInfo> entry: state.getStoredApplications().entrySet()) {
-//      ApplicationId appId = entry.getKey();
-//      ApplicationInfo appInfo = entry.getValue();
-//      SchedulerApp app = applications.get(appId);
-//      app.allocate(appInfo.getContainers());
-//    }
+    // NOT IMPLEMENTED
   }
 
   @Override

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/AppsBlock.java Fri Nov 30 19:58:09 2012
@@ -20,12 +20,13 @@ package org.apache.hadoop.yarn.server.re
 
 import static org.apache.hadoop.yarn.util.StringHelper.join;
 import static org.apache.hadoop.yarn.webapp.YarnWebParams.APP_STATE;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR;
-import static org.apache.hadoop.yarn.webapp.view.JQueryUI._PROGRESSBAR_VALUE;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR;
+import static org.apache.hadoop.yarn.webapp.view.JQueryUI.C_PROGRESSBAR_VALUE;
 
 import java.util.Collection;
 import java.util.HashSet;
 
+import org.apache.commons.lang.StringEscapeUtils;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppInfo;
@@ -72,41 +73,50 @@ class AppsBlock extends HtmlBlock {
         reqAppStates.add(RMAppState.valueOf(stateString));
       }
     }
+    StringBuilder appsTableData = new StringBuilder("[\n");
     for (RMApp app : list.apps.values()) {
       if (reqAppStates != null && !reqAppStates.contains(app.getState())) {
         continue;
       }
       AppInfo appInfo = new AppInfo(app, true);
       String percent = String.format("%.1f", appInfo.getProgress());
-      String startTime = Times.format(appInfo.getStartTime());
-      String finishTime = Times.format(appInfo.getFinishTime());
-      tbody.
-        tr().
-          td().
-            br().$title(appInfo.getAppIdNum())._(). // for sorting
-            a(url("app", appInfo.getAppId()), appInfo.getAppId())._().
-          td(appInfo.getUser()).
-          td(appInfo.getName()).
-          td(appInfo.getQueue()).
-          td().
-            br().$title(String.valueOf(appInfo.getStartTime()))._().
-            _(startTime)._().
-          td().
-            br().$title(String.valueOf(appInfo.getFinishTime()))._().
-            _(finishTime)._().
-          td(appInfo.getState()).
-          td(appInfo.getFinalStatus()).
-          td().
-            br().$title(percent)._(). // for sorting
-            div(_PROGRESSBAR).
-              $title(join(percent, '%')). // tooltip
-              div(_PROGRESSBAR_VALUE).
-                $style(join("width:", percent, '%'))._()._()._().
-          td().
-            a(!appInfo.isTrackingUrlReady()?
-              "#" : appInfo.getTrackingUrlPretty(), appInfo.getTrackingUI())._()._();
+      //AppID numerical value parsed by parseHadoopID in yarn.dt.plugins.js
+      appsTableData.append("[\"<a href='")
+      .append(url("app", appInfo.getAppId())).append("'>")
+      .append(appInfo.getAppId()).append("</a>\",\"")
+      .append(StringEscapeUtils.escapeHtml(appInfo.getUser()))
+      .append("\",\"")
+      .append(StringEscapeUtils.escapeHtml(appInfo.getName()))
+      .append("\",\"")
+      .append(StringEscapeUtils.escapeHtml(appInfo.getQueue()))
+      .append("\",\"")
+      .append(appInfo.getStartTime()).append("\",\"")
+      .append(appInfo.getFinishTime()).append("\",\"")
+      .append(appInfo.getState()).append("\",\"")
+      .append(appInfo.getFinalStatus()).append("\",\"")
+      // Progress bar
+      .append("<br title='").append(percent)
+      .append("'> <div class='").append(C_PROGRESSBAR).append("' title='")
+      .append(join(percent, '%')).append("'> ").append("<div class='")
+      .append(C_PROGRESSBAR_VALUE).append("' style='")
+      .append(join("width:", percent, '%')).append("'> </div> </div>")
+      .append("\",\"<a href='");
+
+      String trackingURL =
+        !appInfo.isTrackingUrlReady()? "#" : appInfo.getTrackingUrlPretty();
+      
+      appsTableData.append(trackingURL).append("'>")
+      .append(appInfo.getTrackingUI()).append("</a>\"],\n");
+
       if (list.rendering != Render.HTML && ++i >= 20) break;
     }
+    if(appsTableData.charAt(appsTableData.length() - 2) == ',') {
+      appsTableData.delete(appsTableData.length()-2, appsTableData.length()-1);
+    }
+    appsTableData.append("]");
+    html.script().$type("text/javascript").
+    _("var appsTableData=" + appsTableData)._();
+
     tbody._()._();
 
     if (list.rendering == Render.JS_ARRAY) {

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/NavBlock.java Fri Nov 30 19:58:09 2012
@@ -50,7 +50,6 @@ public class NavBlock extends HtmlBlock 
           li().a("/conf", "Configuration")._().
           li().a("/logs", "Local logs")._().
           li().a("/stacks", "Server stacks")._().
-          li().a("/metrics", "Server metrics")._()._()._().
-      div("#themeswitcher")._();
+          li().a("/metrics", "Server metrics")._()._()._();
   }
 }

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmView.java Fri Nov 30 19:58:09 2012
@@ -47,7 +47,6 @@ public class RmView extends TwoColumnLay
   protected void commonPreHead(Page.HTML<_> html) {
     set(ACCORDION_ID, "nav");
     set(initID(ACCORDION, "nav"), "{autoHeight:false, active:0}");
-    set(THEMESWITCHER_ID, "themeswitcher");
   }
 
   @Override
@@ -63,13 +62,23 @@ public class RmView extends TwoColumnLay
   private String appsTableInit() {
     AppsList list = getInstance(AppsList.class);
     // id, user, name, queue, starttime, finishtime, state, status, progress, ui
-    StringBuilder init = tableInit().
-        append(", aoColumns:[{sType:'title-numeric'}, null, null, null, ").
-        append("{sType:'title-numeric'}, {sType:'title-numeric'} , null, ").
-        append("null,{sType:'title-numeric', bSearchable:false}, null]");
+    StringBuilder init = tableInit()
+      .append(", 'aaData': appsTableData")
+      .append(", bDeferRender: true")
+      .append(", bProcessing: true")
 
-    // Sort by id upon page load
-    init.append(", aaSorting: [[0, 'desc']]");
+      .append("\n, aoColumnDefs: [\n")
+      .append("{'sType':'numeric', 'aTargets': [0]")
+      .append(", 'mRender': parseHadoopID }")
+
+      .append("\n, {'sType':'numeric', 'aTargets': [4, 5]")
+      .append(", 'mRender': renderHadoopDate }")
+
+      .append("\n, {'sType':'numeric', bSearchable:false, 'aTargets': [8]")
+      .append(", 'mRender': parseHadoopProgress }]")
+
+      // Sort by id upon page load
+      .append(", aaSorting: [[0, 'desc']]");
 
     String rows = $("rowlimit");
     int rowLimit = rows.isEmpty() ? MAX_DISPLAY_ROWS : Integer.parseInt(rows);

Modified: hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java?rev=1415815&r1=1415814&r2=1415815&view=diff
==============================================================================
--- hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java (original)
+++ hadoop/common/branches/branch-trunk-win/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerInfo.java Fri Nov 30 19:58:09 2012
@@ -23,7 +23,7 @@ import java.util.Collection;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 
 public class FairSchedulerInfo {
@@ -32,9 +32,9 @@ public class FairSchedulerInfo {
   
   public FairSchedulerInfo(FairScheduler fs) {
     scheduler = fs;
-    Collection<FSQueue> queues = fs.getQueueManager().getQueues();
+    Collection<FSLeafQueue> queues = fs.getQueueManager().getLeafQueues();
     queueInfos = new ArrayList<FairSchedulerQueueInfo>();
-    for (FSQueue queue : queues) {
+    for (FSLeafQueue queue : queues) {
       queueInfos.add(new FairSchedulerQueueInfo(queue, fs));
     }
   }