You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [4/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Sat Nov 28 20:26:01 2009
@@ -20,21 +20,19 @@
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.AbstractQueue.AbstractQueueComparator;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
-import org.apache.hadoop.mapred.TaskTrackerStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -57,218 +55,18 @@
  *  
  */
 class CapacityTaskScheduler extends TaskScheduler {
-  
-  /***********************************************************************
-   * Keeping track of scheduling information for queues
-   * 
-   * We need to maintain scheduling information relevant to a queue (its 
-   * name, capacity, etc), along with information specific to 
-   * each kind of task, Map or Reduce (num of running tasks, pending 
-   * tasks etc). 
-   * 
-   * This scheduling information is used to decide how to allocate
-   * tasks, redistribute capacity, etc.
-   *  
-   * A QueueSchedulingInfo(QSI) object represents scheduling information for
-   * a queue. A TaskSchedulingInfo (TSI) object represents scheduling 
-   * information for a particular kind of task (Map or Reduce).
-   *   
-   **********************************************************************/
-
-  private static class TaskSchedulingInfo {
-    
-    private static final String LIMIT_NORMALIZED_CAPACITY_STRING
-      = "(Capacity is restricted to max limit of %d slots.\n" +
-        "Remaining %d slots will be used by other queues.)\n";
-    /** 
-     * the actual capacity, which depends on how many slots are available
-     * in the cluster at any given time. 
-     */
-    private int capacity = 0;
-    // number of running tasks
-    int numRunningTasks = 0;
-    // number of slots occupied by running tasks
-    int numSlotsOccupied = 0;
-
-    /**
-     * max task limit
-     * This value is the maximum slots that can be used in a
-     * queue at any point of time. So for example assuming above config value
-     * is 100 , not more than 100 tasks would be in the queue at any point of
-     * time, assuming each task takes one slot.
-     */
-    private int maxTaskLimit = -1;
-
-    /**
-     * for each user, we need to keep track of number of slots occupied by
-     * running tasks
-     */
-    Map<String, Integer> numSlotsOccupiedByUser = 
-      new HashMap<String, Integer>();
-
-    /**
-     * reset the variables associated with tasks
-     */
-    void resetTaskVars() {
-      numRunningTasks = 0;
-      numSlotsOccupied = 0;
-      for (String s: numSlotsOccupiedByUser.keySet()) {
-        numSlotsOccupiedByUser.put(s, Integer.valueOf(0));
-      }
-    }
-
-
-    int getMaxTaskLimit() {
-      return maxTaskLimit;
-    }
-
-    void setMaxTaskLimit(int maxTaskCap) {
-      this.maxTaskLimit = maxTaskCap;
-    }
-
-    /**
-     * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
-     * capacity.
-     * @return
-     */
-    int getCapacity() {
-      return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
-        capacity;
-    }
-
-    /**
-     * Mutator method for capacity
-     * @param capacity
-     */
-    void setCapacity(int capacity) {
-        this.capacity = capacity;
-    }
 
+  /** quick way to get qsc object given a queue name */
+  private Map<String, QueueSchedulingContext> queueInfoMap =
+    new HashMap<String, QueueSchedulingContext>();
+
+  //Root level queue . It has all the
+  //cluster capacity at its disposal.
+  //Queues declared by users would
+  //be children of this queue.
+  //CS would have handle to root.
+  private AbstractQueue root = null;
 
-    /**
-     * return information about the tasks
-     */
-    @Override
-    public String toString() {
-      float occupiedSlotsAsPercent =
-          getCapacity() != 0 ?
-            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
-      StringBuffer sb = new StringBuffer();
-      
-      sb.append("Capacity: " + capacity + " slots\n");
-      //If maxTaskLimit is less than the capacity
-      if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
-        sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,   
-                        maxTaskLimit, (capacity-maxTaskLimit)));
-      }
-      if (maxTaskLimit >= 0) {
-        sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
-      }
-      sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
-          Integer.valueOf(numSlotsOccupied), Float
-              .valueOf(occupiedSlotsAsPercent)));
-      sb.append(String.format("Running tasks: %d\n", Integer
-          .valueOf(numRunningTasks)));
-      // include info on active users
-      if (numSlotsOccupied != 0) {
-        sb.append("Active users:\n");
-        for (Map.Entry<String, Integer> entry : numSlotsOccupiedByUser
-            .entrySet()) {
-          if ((entry.getValue() == null) || (entry.getValue().intValue() <= 0)) {
-            // user has no tasks running
-            continue;
-          }
-          sb.append("User '" + entry.getKey() + "': ");
-          int numSlotsOccupiedByThisUser = entry.getValue().intValue();
-          float p =
-              (float) numSlotsOccupiedByThisUser * 100 / numSlotsOccupied;
-          sb.append(String.format("%d (%.1f%% of used capacity)\n", Long
-              .valueOf(numSlotsOccupiedByThisUser), Float.valueOf(p)));
-        }
-      }
-      return sb.toString();
-    }
-  }
-  
-  private static class QueueSchedulingInfo {
-    String queueName;
-
-    /** capacity(%) is set in the config */ 
-    float capacityPercent = 0;
-    
-    /** 
-     * to handle user limits, we need to know how many users have jobs in 
-     * the queue.
-     */  
-    Map<String, Integer> numJobsByUser = new HashMap<String, Integer>();
-      
-    /** min value of user limit (same for all users) */
-    int ulMin;
-    
-    /**
-     * We keep track of the JobQueuesManager only for reporting purposes 
-     * (in toString()). 
-     */
-    private JobQueuesManager jobQueuesManager;
-    
-    /**
-     * We keep a TaskSchedulingInfo object for each kind of task we support
-     */
-    TaskSchedulingInfo mapTSI;
-    TaskSchedulingInfo reduceTSI;
-    
-    public QueueSchedulingInfo(String queueName, float capacityPercent,
-                               int ulMin, JobQueuesManager jobQueuesManager,
-                               int mapCap, int reduceCap) {
-      this.queueName = new String(queueName);
-      this.capacityPercent = capacityPercent;
-      this.ulMin = ulMin;
-      this.jobQueuesManager = jobQueuesManager;
-      this.mapTSI = new TaskSchedulingInfo();
-      this.reduceTSI = new TaskSchedulingInfo();
-      this.mapTSI.setMaxTaskLimit(mapCap);
-      this.reduceTSI.setMaxTaskLimit(reduceCap);
-    }
-    
-    /**
-     * return information about the queue
-     * @return a String representing the information about the queue.
-     */
-    @Override
-    public String toString(){
-      // We print out the queue information first, followed by info
-      // on map and reduce tasks and job info
-      StringBuffer sb = new StringBuffer();
-      sb.append("Queue configuration\n");
-      sb.append("Capacity Percentage: ");
-      sb.append(capacityPercent);
-      sb.append("%\n");
-      sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
-      sb.append(String.format("Priority Supported: %s\n",
-          (jobQueuesManager.doesQueueSupportPriorities(queueName))?
-              "YES":"NO"));
-      sb.append("-------------\n");
-
-      sb.append("Map tasks\n");
-      sb.append(mapTSI.toString());
-      sb.append("-------------\n");
-      sb.append("Reduce tasks\n");
-      sb.append(reduceTSI.toString());
-      sb.append("-------------\n");
-      
-      sb.append("Job info\n");
-      sb.append(String.format("Number of Waiting Jobs: %d\n", 
-          jobQueuesManager.getWaitingJobCount(queueName)));
-      sb.append(String.format("Number of users who have submitted jobs: %d\n", 
-          numJobsByUser.size()));
-      return sb.toString();
-    }
-  }
-
-  /** quick way to get qsi object given a queue name */
-  private Map<String, QueueSchedulingInfo> queueInfoMap = 
-    new HashMap<String, QueueSchedulingInfo>();
-  
   /**
    * This class captures scheduling information we want to display or log.
    */
@@ -283,16 +81,17 @@
     
     @Override
     public String toString(){
-      // note that we do not call updateQSIObjects() here for performance
+      // note that we do not call updateContextObjects() here for performance
       // reasons. This means that the data we print out may be slightly
       // stale. This data is updated whenever assignTasks() is called
       // If this doesn't happen, the data gets stale. If we see
       // this often, we may need to detect this situation and call 
-      // updateQSIObjects(), or just call it each time. 
+      // updateContextObjects(), or just call it each time.
       return scheduler.getDisplayInfo(queueName);
     }
   }
 
+
   // this class encapsulates the result of a task lookup
   private static class TaskLookupResult {
 
@@ -318,6 +117,7 @@
     }
     
     static TaskLookupResult getTaskFoundResult(Task t) {
+      LOG.debug("Returning task " + t);
       return new TaskLookupResult(t, LookUpStatus.TASK_FOUND);
     }
     static TaskLookupResult getNoTaskFoundResult() {
@@ -337,11 +137,11 @@
     }
   }
 
-  /** 
-   * This class handles the scheduling algorithms. 
-   * The algos are the same for both Map and Reduce tasks. 
+  /**
+   * This class handles the scheduling algorithms.
+   * The algos are the same for both Map and Reduce tasks.
    * There may be slight variations later, in which case we can make this
-   * an abstract base class and have derived classes for Map and Reduce.  
+   * an abstract base class and have derived classes for Map and Reduce.
    */
   private static abstract class TaskSchedulingMgr {
 
@@ -349,69 +149,40 @@
     protected CapacityTaskScheduler scheduler;
     protected TaskType type = null;
 
-    abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
+    abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
         JobInProgress job) throws IOException;
 
-    int getSlotsOccupied(JobInProgress job) {
-      return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
-             getSlotsPerTask(job);
-    }
-
     abstract int getClusterCapacity();
-    abstract int getSlotsPerTask(JobInProgress job);
-    abstract int getRunningTasks(JobInProgress job);
-    abstract int getPendingTasks(JobInProgress job);
-    abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
-    abstract int getNumReservedTaskTrackers(JobInProgress job);
-    
+    abstract TaskSchedulingContext getTSC(
+      QueueSchedulingContext qsc);
     /**
      * To check if job has a speculative task on the particular tracker.
-     * 
+     *
      * @param job job to check for speculative tasks.
      * @param tts task tracker on which speculative task would run.
      * @return true if there is a speculative task to run on the tracker.
      */
-    abstract boolean hasSpeculativeTask(JobInProgress job, 
+    abstract boolean hasSpeculativeTask(JobInProgress job,
         TaskTrackerStatus tts);
 
     /**
-     * Check if the given job has sufficient reserved tasktrackers for all its
-     * pending tasks.
-     * 
-     * @param job job to check for sufficient reserved tasktrackers 
-     * @return <code>true</code> if the job has reserved tasktrackers,
-     *         else <code>false</code>
-     */
-    boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
-      return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
-    }
-    
-    /**
-     * List of QSIs for assigning tasks.
-     * Queues are ordered by a ratio of (# of running tasks)/capacity, which
-     * indicates how much 'free space' the queue has, or how much it is over
-     * capacity. This ordered list is iterated over, when assigning tasks.
-     */  
-    private List<QueueSchedulingInfo> qsiForAssigningTasks = 
-      new ArrayList<QueueSchedulingInfo>();
-
-    /**
      * Comparator to sort queues.
-     * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For 
-     * reducers, we use reduceTSI. So we'll need separate comparators.  
-     */ 
-    private static abstract class QueueComparator 
-      implements Comparator<QueueSchedulingInfo> {
-      abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
-      public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
-        TaskSchedulingInfo t1 = getTSI(q1);
-        TaskSchedulingInfo t2 = getTSI(q2);
+     * For maps, we need to sort on QueueSchedulingContext.mapTSC. For
+     * reducers, we use reduceTSC. So we'll need separate comparators.
+     */
+    private static abstract class QueueComparator
+      implements Comparator<AbstractQueue> {
+      abstract TaskSchedulingContext getTSC(
+        QueueSchedulingContext qsi);
+      public int compare(AbstractQueue q1, AbstractQueue q2) {
+        TaskSchedulingContext t1 = getTSC(q1.getQueueSchedulingContext());
+        TaskSchedulingContext t2 = getTSC(q2.getQueueSchedulingContext());
         // look at how much capacity they've filled. Treat a queue with
         // capacity=0 equivalent to a queue running at capacity
         double r1 = (0 == t1.getCapacity())? 1.0f:
-          (double)t1.numSlotsOccupied/(double) t1.getCapacity();
+          (double) t1.getNumSlotsOccupied() /(double) t1.getCapacity();
         double r2 = (0 == t2.getCapacity())? 1.0f:
-          (double)t2.numSlotsOccupied/(double) t2.getCapacity();
+          (double) t2.getNumSlotsOccupied() /(double) t2.getCapacity();
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
@@ -419,67 +190,78 @@
     }
     // subclass for map and reduce comparators
     private static final class MapQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.mapTSI;
+      TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+        return qsi.getMapTSC();
       }
     }
     private static final class ReduceQueueComparator extends QueueComparator {
-      TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-        return qsi.reduceTSI;
+      TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+        return qsi.getReduceTSC();
       }
     }
+
     // these are our comparator instances
-    protected final static MapQueueComparator mapComparator = new MapQueueComparator();
-    protected final static ReduceQueueComparator reduceComparator = new ReduceQueueComparator();
+    protected final static MapQueueComparator mapComparator =
+      new MapQueueComparator();
+    protected final static ReduceQueueComparator reduceComparator =
+      new ReduceQueueComparator();
     // and this is the comparator to use
     protected QueueComparator queueComparator;
 
     // Returns queues sorted according to the QueueComparator.
     // Mainly for testing purposes.
     String[] getOrderedQueues() {
-      List<String> queues = new ArrayList<String>(qsiForAssigningTasks.size());
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        queues.add(qsi.queueName);
+      List<AbstractQueue> queueList = getOrderedJobQueues();
+      List<String> queues = new ArrayList<String>(queueList.size());
+      for (AbstractQueue q : queueList) {
+        queues.add(q.getName());
       }
       return queues.toArray(new String[queues.size()]);
     }
 
+    /**
+     * Return an ordered list of {@link JobQueue}s wrapped as
+     * {@link AbstractQueue}s. Ordering is according to {@link QueueComparator}.
+     * To reflect the true ordering of the JobQueues, the complete hierarchy is
+     * sorted such that {@link AbstractQueue}s are ordered according to their
+     * needs at each level in the hierarchy, after which only the leaf level
+     * {@link JobQueue}s are returned.
+     * 
+     * @return a list of {@link JobQueue}s wrapped as {@link AbstractQueue}s
+     *         sorted by their needs.
+     */
+    List<AbstractQueue> getOrderedJobQueues() {
+      scheduler.root.sort(queueComparator);
+      return scheduler.root.getDescendentJobQueues();
+    }
+
     TaskSchedulingMgr(CapacityTaskScheduler sched) {
       scheduler = sched;
     }
-    
-    // let the scheduling mgr know which queues are in the system
-    void initialize(Map<String, QueueSchedulingInfo> qsiMap) { 
-      // add all the qsi objects to our list and sort
-      qsiForAssigningTasks.addAll(qsiMap.values());
-      Collections.sort(qsiForAssigningTasks, queueComparator);
-    }
-    
-    private synchronized void updateCollectionOfQSIs() {
-      Collections.sort(qsiForAssigningTasks, queueComparator);
-    }
-
 
-    private boolean isUserOverLimit(JobInProgress j, QueueSchedulingInfo qsi) {
+    private boolean isUserOverLimit(JobInProgress j,
+                                    QueueSchedulingContext qsc) {
       // what is our current capacity? It is equal to the queue-capacity if
       // we're running below capacity. If we're running over capacity, then its
       // #running plus slotPerTask of the job (which is the number of extra
       // slots we're getting).
       int currentCapacity;
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numSlotsOccupied < tsi.getCapacity()) {
+      TaskSchedulingContext tsi = getTSC(qsc);
+      if (tsi.getNumSlotsOccupied() < tsi.getCapacity()) {
         currentCapacity = tsi.getCapacity();
       }
       else {
-        currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
+        currentCapacity =
+          tsi.getNumSlotsOccupied() +
+            TaskDataView.getTaskDataView(type).getSlotsPerTask(j);
       }
       int limit = Math.max((int)(Math.ceil((double)currentCapacity/
-          (double)qsi.numJobsByUser.size())), 
-          (int)(Math.ceil((double)(qsi.ulMin*currentCapacity)/100.0)));
+          (double) qsc.getNumJobsByUser().size())),
+          (int)(Math.ceil((double)(qsc.getUlMin() *currentCapacity)/100.0)));
       String user = j.getProfile().getUser();
-      if (tsi.numSlotsOccupiedByUser.get(user) >= limit) {
-        LOG.debug("User " + user + " is over limit, num slots occupied = " + 
-            tsi.numSlotsOccupiedByUser.get(user) + ", limit = " + limit);
+      if (tsi.getNumSlotsOccupiedByUser().get(user) >= limit) {
+        LOG.debug("User " + user + " is over limit, num slots occupied = " +
+            tsi.getNumSlotsOccupiedByUser().get(user) + ", limit = " + limit);
         return true;
       }
       else {
@@ -488,29 +270,36 @@
     }
 
     /*
-     * This is the central scheduling method. 
-     * It tries to get a task from jobs in a single queue. 
-     * Always return a TaskLookupResult object. Don't return null. 
+     * This is the central scheduling method.
+     * It tries to get a task from jobs in a single queue.
+     * Always return a TaskLookupResult object. Don't return null.
      */
     private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
-                                              QueueSchedulingInfo qsi)
+                                              QueueSchedulingContext qsi)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
       // who have been potentially initialized
 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        // only look at jobs that can be run. We ignore jobs that haven't 
-        // initialized, or have completed but haven't been removed from the 
-        // running queue. 
+      for (JobInProgress j :
+        scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+          .getRunningJobs()) {
+        // only look at jobs that can be run. We ignore jobs that haven't
+        // initialized, or have completed but haven't been removed from the
+        // running queue.
+
+        //Check queue for maximum capacity .
+        if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+          continue;
+        }
+        
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
         // check if the job's user is over limit
         if (isUserOverLimit(j, qsi)) {
           continue;
-        } 
+        }
         //If this job meets memory requirements. Ask the JobInProgress for
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
@@ -526,7 +315,6 @@
             //skip to the next job in the queue.
             LOG.debug("Job " + j.getJobID().toString()
                 + " returned no tasks of type " + type);
-            continue;
           }
         } else {
           // if memory requirements don't match then we check if the job has
@@ -534,7 +322,9 @@
           // tasktrackers to cover all pending tasks. If so we reserve the
           // current tasktracker for this job so that high memory jobs are not
           // starved
-          if ((getPendingTasks(j) != 0 && !hasSufficientReservedTaskTrackers(j))) {
+          TaskDataView view = TaskDataView.getTaskDataView(type);
+          if ((view.getPendingTasks(j) != 0 && 
+                !view.hasSufficientReservedTaskTrackers(j))) {
             // Reserve all available slots on this tasktracker
             LOG.info(j.getJobID() + ": Reserving "
                 + taskTracker.getTrackerName()
@@ -549,28 +339,36 @@
         // if we're here, this job has no task to run. Look at the next job.
       }//end of for loop
 
-      // if we're here, we haven't found any task to run among all jobs in 
-      // the queue. This could be because there is nothing to run, or that 
-      // the user limit for some user is too strict, i.e., there's at least 
-      // one user who doesn't have enough tasks to satisfy his limit. If 
-      // it's the latter case, re-look at jobs without considering user 
+      // if we're here, we haven't found any task to run among all jobs in
+      // the queue. This could be because there is nothing to run, or that
+      // the user limit for some user is too strict, i.e., there's at least
+      // one user who doesn't have enough tasks to satisfy his limit. If
+      // it's the latter case, re-look at jobs without considering user
       // limits, and get a task from the first eligible job; however
-      // we do not 'reserve' slots on tasktrackers anymore since the user is 
+      // we do not 'reserve' slots on tasktrackers anymore since the user is
       // already over the limit
-      // Note: some of the code from above is repeated here. This is on 
-      // purpose as it improves overall readability.  
+      // Note: some of the code from above is repeated here. This is on
+      // purpose as it improves overall readability.
       // Note: we walk through jobs again. Some of these jobs, which weren't
-      // considered in the first pass, shouldn't be considered here again, 
+      // considered in the first pass, shouldn't be considered here again,
       // but we still check for their viability to keep the code simple. In
-      // some cases, for high mem jobs that have nothing to run, we call 
-      // obtainNewTask() unnecessarily. Should this be a problem, we can 
-      // create a list of jobs to look at (those whose users were over 
-      // limit) in the first pass and walk through that list only. 
-      for (JobInProgress j : 
-        scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
+      // some cases, for high mem jobs that have nothing to run, we call
+      // obtainNewTask() unnecessarily. Should this be a problem, we can
+      // create a list of jobs to look at (those whose users were over
+      // limit) in the first pass and walk through that list only.
+      for (JobInProgress j :
+        scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+          .getRunningJobs()) {
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
+
+        //Check for the maximum-capacity.
+        if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+          continue;
+        }
+
+
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
             taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
@@ -580,87 +378,89 @@
             // we're successful in getting a task
             return TaskLookupResult.getTaskFoundResult(t);
           } else {
-            //skip to the next job in the queue.
-            continue;
           }
         } else {
-          //if memory requirements don't match then we check if the 
+          //if memory requirements don't match then we check if the
           //job has either pending or speculative task. If the job
           //has pending or speculative task we block till this job
-          //tasks get scheduled, so that high memory jobs are not 
+          //tasks get scheduled, so that high memory jobs are not
           //starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
+          if (TaskDataView.getTaskDataView(type).getPendingTasks(j) != 0 ||
+            hasSpeculativeTask(j, taskTrackerStatus)) {
             return TaskLookupResult.getMemFailedResult();
-          } 
+          }
         }//end of memory check block
       }//end of for loop
 
       // found nothing for this queue, look at the next one.
-      String msg = "Found no task from the queue " + qsi.queueName;
+      String msg = "Found no task from the queue " + qsi.getQueueName();
       LOG.debug(msg);
       return TaskLookupResult.getNoTaskFoundResult();
     }
 
-    // Always return a TaskLookupResult object. Don't return null. 
-    // The caller is responsible for ensuring that the QSI objects and the 
+    // Always return a TaskLookupResult object. Don't return null.
+    // The caller is responsible for ensuring that the QSC objects and the
     // collections are up-to-date.
-    private TaskLookupResult assignTasks(TaskTracker taskTracker) 
+    private TaskLookupResult assignTasks(TaskTracker taskTracker)
     throws IOException {
       TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
-      printQSIs();
+      printQSCs();
 
       // Check if this tasktracker has been reserved for a job...
       JobInProgress job = taskTracker.getJobForFallowSlot(type);
       if (job != null) {
         int availableSlots = taskTracker.getAvailableSlots(type);
         if (LOG.isDebugEnabled()) {
-          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " + 
-                    taskTracker.getTrackerName() + " with " + availableSlots + 
+          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " +
+                    taskTracker.getTrackerName() + " with " + availableSlots +
                     " '" + type + "' slots");
         }
 
         if (availableSlots >= job.getNumSlotsPerTask(type)) {
-          // Unreserve 
+          // Unreserve
           taskTracker.unreserveSlots(type, job);
-          
+
           // We found a suitable job. Get task from it.
           Task t = obtainNewTask(taskTrackerStatus, job);
           //if there is a task return it immediately.
           if (t != null) {
             if (LOG.isDebugEnabled()) {
-              LOG.info(job.getJobID() + ": Got " + t.getTaskID() + 
-                       " for reserved tasktracker " + 
+              LOG.info(job.getJobID() + ": Got " + t.getTaskID() +
+                       " for reserved tasktracker " +
                        taskTracker.getTrackerName());
             }
             // we're successful in getting a task
             return TaskLookupResult.getTaskFoundResult(t);
-          } 
+          }
         } else {
           // Re-reserve the current tasktracker
           taskTracker.reserveSlots(type, job, availableSlots);
-          
+
           if (LOG.isDebugEnabled()) {
-            LOG.debug(job.getJobID() + ": Re-reserving " + 
+            LOG.debug(job.getJobID() + ": Re-reserving " +
                       taskTracker.getTrackerName());
           }
 
-          return TaskLookupResult.getMemFailedResult(); 
+          return TaskLookupResult.getMemFailedResult();
         }
       }
-      
-      
-      for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-        // we may have queues with capacity=0. We shouldn't look at jobs from 
+
+      for (AbstractQueue q : getOrderedJobQueues()) {
+        QueueSchedulingContext qsc = q.getQueueSchedulingContext();
+        // we may have queues with capacity=0. We shouldn't look at jobs from
         // these queues
-        if (0 == getTSI(qsi).getCapacity()) {
+        if (0 == getTSC(qsc).getCapacity()) {
           continue;
         }
-        
-        if(this.areTasksInQueueOverLimit(qsi)) {
+
+        //This call is important for optimization purposes , if we
+        //have reached the limit already no need for traversing the queue.
+        if(this.areTasksInQueueOverMaxCapacity(qsc,1)) {
           continue;
         }
-        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
+        
+        TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
         TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
 
         if (lookUpStatus == TaskLookupResult.LookUpStatus.NO_TASK_FOUND) {
@@ -672,7 +472,7 @@
           return tlr;
         }
         // if there was a memory mismatch, return
-        else if (lookUpStatus == 
+        else if (lookUpStatus ==
           TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT) {
             return tlr;
         }
@@ -684,22 +484,32 @@
 
 
     /**
-     * Check if the max task limit is set for this queue
-     * if set , ignore this qsi if current num of occupied
-     * slots  of a TYPE in the queue is >= getMaxTaskCap().
-     * @param qsi
-     * @return
+     * Check if maximum-capacity is set  for this queue.
+     * If set and greater than 0 ,
+     * check if numofslotsoccupied+numSlotsPerTask is greater than
+     * maximum-Capacity ,if yes , implies this queue is over limit.
+     *
+     * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
+     * numOfSlotsOccupied+noSlotsPerTask is more than maximum-capacity we still
+     * dont assign the task . This may lead to under utilization of very small
+     * set of slots. But this is ok ,as we strictly respect the maximum-capacity
+     * @param qsc
+     * @param noOfSlotsPerTask
+     * @return true if queue is over maximum-capacity
      */
-
-    private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
-      TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.getMaxTaskLimit() >= 0) {
-        if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+    private boolean areTasksInQueueOverMaxCapacity(
+      QueueSchedulingContext qsc,int noOfSlotsPerTask) {
+      TaskSchedulingContext tsi = getTSC(qsc);
+      //check for maximum-capacity
+      if(tsi.getMaxCapacity() >= 0) {
+        if ((tsi.getNumSlotsOccupied() + noOfSlotsPerTask) >
+          tsi.getMaxCapacity()) {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
-              "Queue " + qsi.queueName + " has reached its  max " + type +
-                " limit ");
+              "Queue " + qsc.getQueueName() + " " + "has reached its  max " +
+                type + "Capacity");
             LOG.debug("Current running tasks " + tsi.getCapacity());
+
           }
           return true;
         }
@@ -707,43 +517,45 @@
       return false;
     }
 
+
     // for debugging.
-    private void printQSIs() {
+    private void printQSCs() {
       if (LOG.isDebugEnabled()) {
         StringBuffer s = new StringBuffer();
-        for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
-          TaskSchedulingInfo tsi = getTSI(qsi);
+        for (AbstractQueue aq: getOrderedJobQueues()) {
+          QueueSchedulingContext qsi = aq.getQueueSchedulingContext();
+          TaskSchedulingContext tsi = getTSC(qsi);
           Collection<JobInProgress> runJobs =
-            scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+            scheduler.jobQueuesManager.getJobQueue(qsi.getQueueName())
+              .getRunningJobs();
           s.append(
             String.format(
               " Queue '%s'(%s): runningTasks=%d, "
-                + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxTaskLimit=%d ",
-              qsi.queueName,
-              this.type, Integer.valueOf(tsi.numRunningTasks), Integer
-                .valueOf(tsi.numSlotsOccupied), Integer
-                .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
-              Integer.valueOf(tsi.getMaxTaskLimit())));
+                + "occupiedSlots=%d, capacity=%d, runJobs=%d  maximumCapacity=%d ",
+              qsi.getQueueName(),
+              this.type, tsi.getNumRunningTasks(),
+              tsi.getNumSlotsOccupied(), tsi.getCapacity(), (runJobs.size()),
+              tsi.getMaxCapacity()));
         }
         LOG.debug(s);
       }
     }
-    
+
     /**
-     * Check if one of the tasks have a speculative task to execute on the 
+     * Check if one of the tasks have a speculative task to execute on the
      * particular task tracker.
-     * 
+     *
      * @param tips tasks of a job
-     * @param progress percentage progress of the job
      * @param tts task tracker status for which we are asking speculative tip
      * @return true if job has a speculative task to run on particular TT.
      */
-    boolean hasSpeculativeTask(TaskInProgress[] tips, float progress, 
-        TaskTrackerStatus tts) {
+    boolean hasSpeculativeTask(
+      TaskInProgress[] tips,
+      TaskTrackerStatus tts) {
       long currentTime = System.currentTimeMillis();
       for(TaskInProgress tip : tips)  {
-        if(tip.isRunning() 
-            && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName())) 
+        if(tip.isRunning()
+            && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName()))
             && tip.canBeSpeculated(currentTime)) {
           return true;
         }
@@ -753,7 +565,7 @@
   }
 
   /**
-   * The scheduling algorithms for map tasks. 
+   * The scheduling algorithms for map tasks.
    */
   private static class MapSchedulingMgr extends TaskSchedulingMgr {
 
@@ -764,12 +576,12 @@
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
     throws IOException {
-      ClusterStatus clusterStatus = 
+      ClusterStatus clusterStatus =
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewMapTask(taskTracker, numTaskTrackers, 
+      return job.obtainNewMapTask(taskTracker, numTaskTrackers,
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
 
@@ -779,43 +591,24 @@
     }
 
     @Override
-    int getRunningTasks(JobInProgress job) {
-      return job.runningMaps();
-    }
-
-    @Override
-    int getPendingTasks(JobInProgress job) {
-      return job.pendingMaps();
+    TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+      return qsi.getMapTSC();
     }
 
-    @Override
-    int getSlotsPerTask(JobInProgress job) {
-      return 
-        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());    
-    }
-
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.mapTSI;
-    }
-
-    int getNumReservedTaskTrackers(JobInProgress job) {
-      return job.getNumReservedTaskTrackersForMaps();
-    }
 
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
-      //Check if job supports speculative map execution first then 
+      //Check if job supports speculative map execution first then
       //check if job has speculative maps.
       return (job.getJobConf().getMapSpeculativeExecution())&& (
-          hasSpeculativeTask(job.getMapTasks(), 
-              job.getStatus().mapProgress(), tts));
+          hasSpeculativeTask(job.getMapTasks(),
+                             tts));
     }
 
   }
 
   /**
-   * The scheduling algorithms for reduce tasks. 
+   * The scheduling algorithms for reduce tasks.
    */
   private static class ReduceSchedulingMgr extends TaskSchedulingMgr {
 
@@ -826,12 +619,12 @@
     }
 
     @Override
-    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job) 
+    Task obtainNewTask(TaskTrackerStatus taskTracker, JobInProgress job)
     throws IOException {
-      ClusterStatus clusterStatus = 
+      ClusterStatus clusterStatus =
         scheduler.taskTrackerManager.getClusterStatus();
       int numTaskTrackers = clusterStatus.getTaskTrackers();
-      return job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+      return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
           scheduler.taskTrackerManager.getNumberOfUniqueHosts());
     }
 
@@ -842,28 +635,8 @@
     }
 
     @Override
-    int getRunningTasks(JobInProgress job) {
-      return job.runningReduces();
-    }
-
-    @Override
-    int getPendingTasks(JobInProgress job) {
-      return job.pendingReduces();
-    }
-
-    @Override
-    int getSlotsPerTask(JobInProgress job) {
-      return
-        job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());    
-    }
-
-    @Override
-    TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
-      return qsi.reduceTSI;
-    }
-
-    int getNumReservedTaskTrackers(JobInProgress job) {
-      return job.getNumReservedTaskTrackersForReduces();
+    TaskSchedulingContext getTSC(QueueSchedulingContext qsi) {
+      return qsi.getReduceTSC();
     }
 
     @Override
@@ -871,33 +644,24 @@
       //check if the job supports reduce speculative execution first then
       //check if the job has speculative tasks.
       return (job.getJobConf().getReduceSpeculativeExecution()) && (
-          hasSpeculativeTask(job.getReduceTasks(), 
-              job.getStatus().reduceProgress(), tts));
+          hasSpeculativeTask(job.getReduceTasks(),
+                             tts));
     }
 
   }
-  
-  /** the scheduling mgrs for Map and Reduce tasks */ 
+
+  /** the scheduling mgrs for Map and Reduce tasks */
   protected TaskSchedulingMgr mapScheduler = new MapSchedulingMgr(this);
   protected TaskSchedulingMgr reduceScheduler = new ReduceSchedulingMgr(this);
 
-  MemoryMatcher memoryMatcher = new MemoryMatcher(this);
+  MemoryMatcher memoryMatcher = new MemoryMatcher();
 
-  /** we keep track of the number of map/reduce slots we saw last */
-  private int prevMapClusterCapacity = 0;
-  private int prevReduceClusterCapacity = 0;
-  
-    
   static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
   protected JobQueuesManager jobQueuesManager;
-  protected CapacitySchedulerConf schedConf;
+
   /** whether scheduler has started or not */
   private boolean started = false;
 
-  final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
-    "%s running map tasks using %d map slots. %d additional slots reserved." +
-    " %s running reduce tasks using %d reduce slots." +
-    " %d additional slots reserved.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -910,10 +674,22 @@
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
-  private long memSizeForMapSlotOnJT;
-  private long memSizeForReduceSlotOnJT;
-  private long limitMaxMemForMapTasks;
-  private long limitMaxMemForReduceTasks;
+  class CapacitySchedulerQueueRefresher extends QueueRefresher {
+    @Override
+    void refreshQueues(List<JobQueueInfo> newRootQueues)
+        throws Throwable {
+      if (!started) {
+        String msg =
+            "Capacity Scheduler is not in the 'started' state."
+                + " Cannot refresh queues.";
+        LOG.error(msg);
+        throw new IOException(msg);
+      }
+      CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
+      initializeQueues(newRootQueues, schedConf, true);
+      initializationPoller.refreshQueueInfo(schedConf);
+    }
+  }
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -921,105 +697,20 @@
   
   // for testing
   public CapacityTaskScheduler(Clock clock) {
-    this.jobQueuesManager = new JobQueuesManager(this);
+    this.jobQueuesManager = new JobQueuesManager();
     this.clock = clock;
   }
-  
-  /** mostly for testing purposes */
-  public void setResourceManagerConf(CapacitySchedulerConf conf) {
-    this.schedConf = conf;
-  }
-
-  private void initializeMemoryRelatedConf() {
-    //handling @deprecated
-    if (conf.get(
-      CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY) !=
-      null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          CapacitySchedulerConf.DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY));
-    }
-
-    //handling @deprecated
-    if (conf.get(CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY) !=
-      null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          CapacitySchedulerConf.UPPER_LIMIT_ON_TASK_PMEM_PROPERTY));
-    }
-
-    if (conf.get(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY));
-    }
-
-    memSizeForMapSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-    memSizeForReduceSlotOnJT =
-        JobConf.normalizeMemoryConfigValue(conf.getLong(
-            JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-
-    //handling @deprecated values
-    if (conf.get(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY) != null) {
-      LOG.warn(
-        JobConf.deprecatedString(
-          JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY)+
-          " instead use " +JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY+
-          " and " + JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY
-      );
-      
-      limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-      if (limitMaxMemForMapTasks != JobConf.DISABLED_MEMORY_LIMIT &&
-        limitMaxMemForMapTasks >= 0) {
-        limitMaxMemForMapTasks = limitMaxMemForReduceTasks =
-          limitMaxMemForMapTasks /
-            (1024 * 1024); //Converting old values in bytes to MB
-      }
-    } else {
-      limitMaxMemForMapTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-      limitMaxMemForReduceTasks =
-        JobConf.normalizeMemoryConfigValue(
-          conf.getLong(
-            JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-            JobConf.DISABLED_MEMORY_LIMIT));
-    }
-    LOG.info(String.format("Scheduler configured with "
-        + "(memSizeForMapSlotOnJT, memSizeForReduceSlotOnJT, "
-        + "limitMaxMemForMapTasks, limitMaxMemForReduceTasks)"
-        + " (%d,%d,%d,%d)", Long.valueOf(memSizeForMapSlotOnJT), Long
-        .valueOf(memSizeForReduceSlotOnJT), Long
-        .valueOf(limitMaxMemForMapTasks), Long
-        .valueOf(limitMaxMemForReduceTasks)));
-  }
 
-  long getMemSizeForMapSlot() {
-    return memSizeForMapSlotOnJT;
-  }
-
-  long getMemSizeForReduceSlot() {
-    return memSizeForReduceSlotOnJT;
-  }
-
-  long getLimitMaxMemForMapSlot() {
-    return limitMaxMemForMapTasks;
-  }
-
-  long getLimitMaxMemForReduceSlot() {
-    return limitMaxMemForReduceTasks;
+  @Override
+  QueueRefresher getQueueRefresher() {
+    return new CapacitySchedulerQueueRefresher();
   }
 
+  /**
+   * Only for testing.
+   * @param type
+   * @return
+   */
   String[] getOrderedQueues(TaskType type) {
     if (type == TaskType.MAP) {
       return mapScheduler.getOrderedQueues();
@@ -1033,80 +724,144 @@
   public synchronized void start() throws IOException {
     if (started) return;
     super.start();
-    // initialize our queues from the config settings
-    if (null == schedConf) {
-      schedConf = new CapacitySchedulerConf();
-    }
 
-    initializeMemoryRelatedConf();
-    
+    // Initialize MemoryMatcher
+    MemoryMatcher.initializeMemoryRelatedConf(conf);
+
     // read queue info from config file
     QueueManager queueManager = taskTrackerManager.getQueueManager();
-    Set<String> queues = queueManager.getQueues();
-    // Sanity check: there should be at least one queue. 
-    if (0 == queues.size()) {
-      throw new IllegalStateException("System has no queue configured");
-    }
 
-    Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
-    float totalCapacity = 0.0f;
-    for (String queueName: queues) {
-      float capacity = schedConf.getCapacity(queueName);
-      if(capacity == -1.0) {
-        queuesWithoutConfiguredCapacity.add(queueName);
-      }else {
-        totalCapacity += capacity;
-      }
-      int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
-      // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(
-        queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
-          queueName), schedConf.getMaxReduceCap(queueName));
-      queueInfoMap.put(queueName, qsi);
-
-      // create the queues of job objects
-      boolean supportsPrio = schedConf.isPrioritySupported(queueName);
-      jobQueuesManager.createQueue(queueName, supportsPrio);
-      
-      SchedulingDisplayInfo schedulingInfo = 
-        new SchedulingDisplayInfo(queueName, this);
-      queueManager.setSchedulerInfo(queueName, schedulingInfo);
-      
+    // initialize our queues from the config settings
+    CapacitySchedulerConf schedConf = new CapacitySchedulerConf();
+    try {
+      initializeQueues(queueManager.getRoot().getJobQueueInfo().getChildren(),
+          schedConf, false);
+    } catch (Throwable e) {
+      LOG.error("Couldn't initialize queues because of the excecption : "
+          + StringUtils.stringifyException(e));
+      throw new IOException(e);
     }
-    float remainingQuantityToAllocate = 100 - totalCapacity;
-    float quantityToAllocate = 
-      remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
-    for(String queue: queuesWithoutConfiguredCapacity) {
-      QueueSchedulingInfo qsi = queueInfoMap.get(queue); 
-      qsi.capacityPercent = quantityToAllocate;
-      schedConf.setCapacity(queue, quantityToAllocate);
-    }    
-    
-    if (totalCapacity > 100.0) {
-      throw new IllegalArgumentException("Sum of queue capacities over 100% at "
-                                         + totalCapacity);
-    }    
-    
-    // let our mgr objects know about the queues
-    mapScheduler.initialize(queueInfoMap);
-    reduceScheduler.initialize(queueInfoMap);
-    
-    // listen to job changes
+
+    // Queues are ready. Now register jobQueuesManager with the JobTracker so as
+    // to listen to job changes
     taskTrackerManager.addJobInProgressListener(jobQueuesManager);
 
     //Start thread for initialization
     if (initializationPoller == null) {
       this.initializationPoller = new JobInitializationPoller(
-          jobQueuesManager,schedConf,queues, taskTrackerManager);
+          jobQueuesManager, taskTrackerManager);
     }
-    initializationPoller.init(queueManager.getQueues(), schedConf);
+    initializationPoller.init(jobQueuesManager.getJobQueueNames(), schedConf);
     initializationPoller.setDaemon(true);
     initializationPoller.start();
 
     started = true;
-    LOG.info("Capacity scheduler initialized " + queues.size() + " queues");  
+
+    LOG.info("Capacity scheduler started successfully");  
   }
-  
+
+  /**
+   * Read the configuration and initialize the queues. This operation should be
+   * done only when either the scheduler is starting or a request is received
+   * from {@link QueueManager} to refresh the queue configuration.
+   * 
+   * <p>
+   * 
+   * Even in case of refresh, we do not explicitly destroy AbstractQueue items,
+   * or the info maps, they will be automatically garbage-collected.
+   * 
+   * <p>
+   * 
+   * We don't explicitly lock the scheduler completely. This method is called at
+   * two times. 1) When the scheduler is starting. During this time, the lock
+   * sequence is JT->scheduler and so we don't need any more locking here. 2)
+   * When refresh is issued to {@link QueueManager}. When this happens, parallel
+   * refreshes are guarded by {@link QueueManager} itself by taking its lock.
+   * 
+   * @param newRootQueues
+   * @param schedConf
+   * @param refreshingQueues
+   * @throws Throwable
+   */
+  private void initializeQueues(List<JobQueueInfo> newRootQueues,
+      CapacitySchedulerConf schedConf, boolean refreshingQueues)
+      throws Throwable {
+
+    if (newRootQueues == null) {
+      throw new IOException(
+          "Cannot initialize the queues with null root-queues!");
+    }
+
+    // Sanity check: there should be at least one queue.
+    if (0 == newRootQueues.size()) {
+      throw new IllegalStateException("System has no queue configured!");
+    }
+
+    // Create a new queue-hierarchy builder and try loading the complete
+    // hierarchy of queues.
+    AbstractQueue newRootAbstractQueue;
+    try {
+      newRootAbstractQueue =
+          new QueueHierarchyBuilder().createHierarchy(newRootQueues, schedConf);
+
+    } catch (Throwable e) {
+      LOG.error("Exception while tryign to (re)initializing queues : "
+          + StringUtils.stringifyException(e));
+      LOG.info("(Re)initializing the queues with the new configuration "
+          + "failed, so keeping the old configuration.");
+      throw e;
+    }
+
+    // New configuration is successfully validated and applied, set the new
+    // configuration to the current queue-hierarchy.
+
+    if (refreshingQueues) {
+      // Scheduler is being refreshed.
+
+      // Going to commit the changes to the hierarchy. Lock the scheduler.
+      synchronized (this) {
+        AbstractQueueComparator comparator = new AbstractQueueComparator();
+        this.root.sort(comparator);
+        newRootAbstractQueue.sort(comparator);
+        root.validateAndCopyQueueContexts(newRootAbstractQueue);
+      }
+    } else {
+      // Scheduler is just starting.
+
+      this.root = newRootAbstractQueue;
+
+      // JobQueue objects are created. Inform the JobQueuesManager so that it
+      // can track the running/waiting jobs. JobQueuesManager is still not added
+      // as a listener to JobTracker, so no locking needed.
+      addJobQueuesToJobQueuesManager();
+    }
+
+    List<AbstractQueue> allQueues = new ArrayList<AbstractQueue>();
+    allQueues.addAll(getRoot().getDescendantContainerQueues());
+    allQueues.addAll(getRoot().getDescendentJobQueues());
+    for (AbstractQueue queue : allQueues) {
+      if (!refreshingQueues) {
+        // Scheduler is just starting, create the display info also
+        createDisplayInfo(taskTrackerManager.getQueueManager(), queue.getName());
+      }
+
+      // QueueSchedulingContext objects are created/have changed. Put them
+      // (back) in the queue-info so as to be consumed by the UI.
+      addToQueueInfoMap(queue.getQueueSchedulingContext());
+    }
+  }
+
+  /**
+   * Inform the {@link JobQueuesManager} about the newly constructed
+   * {@link JobQueue}s.
+   */
+  private void addJobQueuesToJobQueuesManager() {
+    List<AbstractQueue> allJobQueues = getRoot().getDescendentJobQueues();
+    for (AbstractQueue jobQ : allJobQueues) {
+      jobQueuesManager.addQueue((JobQueue)jobQ);
+    }
+  }
+
   /** mostly for testing purposes */
   void setInitializationPoller(JobInitializationPoller p) {
     this.initializationPoller = p;
@@ -1133,117 +888,36 @@
    * provided for the test classes
    * lets you update the QSI objects and sorted collections
    */ 
-  void updateQSIInfoForTests() {
+  void updateContextInfoForTests() {
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
     // update the QSI objects
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    mapScheduler.updateCollectionOfQSIs();
-    reduceScheduler.updateCollectionOfQSIs();
+    updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
+    mapScheduler.scheduler.root.sort(mapScheduler.queueComparator);
+    reduceScheduler.scheduler.root.sort(reduceScheduler.queueComparator);
   }
 
   /**
-   * Update individual QSI objects.
+   * Update individual QSC objects.
    * We don't need exact information for all variables, just enough for us
    * to make scheduling decisions. For example, we don't need an exact count
    * of numRunningTasks. Once we count upto the grid capacity, any
    * number beyond that will make no difference.
    *
    **/
-  private synchronized void updateQSIObjects(int mapClusterCapacity,
+  private synchronized void updateContextObjects(int mapClusterCapacity,
       int reduceClusterCapacity) {
-    // if # of slots have changed since last time, update.
-    // First, compute whether the total number of TT slots have changed
-    for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
-      // compute new capacities, if TT slots have changed
-      if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.setCapacity((int)
-          (qsi.capacityPercent*mapClusterCapacity/100));
-      }
-      if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.setCapacity((int)
-          (qsi.capacityPercent*reduceClusterCapacity/100));
-      }
-      // reset running/pending tasks, tasks per user
-      qsi.mapTSI.resetTaskVars();
-      qsi.reduceTSI.resetTaskVars();
-      // update stats on running jobs
-      for (JobInProgress j:
-        jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
-        if (j.getStatus().getRunState() != JobStatus.RUNNING) {
-          continue;
-        }
+    root.update(mapClusterCapacity,reduceClusterCapacity);
 
-        int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
-        int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
-        int numRunningMapSlots = 
-          numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
-        int numRunningReduceSlots =
-          numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
-        int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
-        int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
-        int numReservedMapSlotsForThisJob = 
-          (mapScheduler.getNumReservedTaskTrackers(j) * 
-           mapScheduler.getSlotsPerTask(j)); 
-        int numReservedReduceSlotsForThisJob = 
-          (reduceScheduler.getNumReservedTaskTrackers(j) * 
-           reduceScheduler.getSlotsPerTask(j)); 
-        j.setSchedulingInfo(
-            String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
-                          Integer.valueOf(numMapsRunningForThisJob), 
-                          Integer.valueOf(numRunningMapSlots),
-                          Integer.valueOf(numReservedMapSlotsForThisJob),
-                          Integer.valueOf(numReducesRunningForThisJob), 
-                          Integer.valueOf(numRunningReduceSlots),
-                          Integer.valueOf(numReservedReduceSlotsForThisJob)));
-        qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
-        qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
-        qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
-        qsi.reduceTSI.numSlotsOccupied += numReduceSlotsForThisJob;
-        Integer i =
-            qsi.mapTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.mapTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numMapSlotsForThisJob));
-        i = qsi.reduceTSI.numSlotsOccupiedByUser.get(j.getProfile().getUser());
-        qsi.reduceTSI.numSlotsOccupiedByUser.put(j.getProfile().getUser(),
-            Integer.valueOf(i.intValue() + numReduceSlotsForThisJob));
-        if (LOG.isDebugEnabled()) {
-          LOG.debug(String.format("updateQSI: job %s: run(m)=%d, "
-              + "occupied(m)=%d, run(r)=%d, occupied(r)=%d, finished(m)=%d,"
-              + " finished(r)=%d, failed(m)=%d, failed(r)=%d, "
-              + "spec(m)=%d, spec(r)=%d, total(m)=%d, total(r)=%d", j
-              .getJobID().toString(), Integer
-              .valueOf(numMapsRunningForThisJob), Integer
-              .valueOf(numMapSlotsForThisJob), Integer
-              .valueOf(numReducesRunningForThisJob), Integer
-              .valueOf(numReduceSlotsForThisJob), Integer.valueOf(j
-              .finishedMaps()), Integer.valueOf(j.finishedReduces()), Integer
-              .valueOf(j.failedMapTasks),
-              Integer.valueOf(j.failedReduceTasks), Integer
-                  .valueOf(j.speculativeMapTasks), Integer
-                  .valueOf(j.speculativeReduceTasks), Integer
-                  .valueOf(j.numMapTasks), Integer.valueOf(j.numReduceTasks)));
-        }
-
-        /*
-         * it's fine walking down the entire list of running jobs - there
-         * probably will not be many, plus, we may need to go through the
-         * list to compute numSlotsOccupiedByUser. If this is expensive, we
-         * can keep a list of running jobs per user. Then we only need to
-         * consider the first few jobs per user.
-         */
-      }
-    }
-
-    prevMapClusterCapacity = mapClusterCapacity;
-    prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
   /*
    * The grand plan for assigning a task. 
-   * First, decide whether a Map or Reduce task should be given to a TT 
-   * (if the TT can accept either). 
+   * Always assigns 1 reduce and 1 map , if sufficient slots are
+   * available for each of types.
+   * If not , then which ever type of slots are available , that type of task is
+   * assigned.
    * Next, pick a queue. We only look at queues that need a slot. Among these,
    * we first look at queues whose (# of running tasks)/capacity is the least.
    * Next, pick a job in a queue. we pick the job at the front of the queue
@@ -1257,12 +931,12 @@
     
     TaskLookupResult tlr;
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+    List<Task> result = new ArrayList<Task>();
     
     /* 
-     * If TT has Map and Reduce slot free, we need to figure out whether to
-     * give it a Map or Reduce task.
-     * Number of ways to do this. For now, base decision on how much is needed
-     * versus how much is used (default to Map, if equal).
+     * If TT has Map and Reduce slot free, we assign 1 map and 1 reduce
+     * We  base decision on how much is needed
+     * versus how much is used
      */
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
@@ -1271,7 +945,8 @@
     int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
     int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
     int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
-    LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() + 
+    LOG.debug("TT asking for task, max maps="
+      + taskTrackerStatus.getMaxMapSlots() + 
         ", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" + 
         taskTrackerStatus.getMaxReduceSlots() + ", run reds=" + 
         taskTrackerStatus.countReduceTasks() + ", map cap=" + 
@@ -1279,145 +954,52 @@
         reduceClusterCapacity);
 
     /* 
-     * update all our QSI objects.
-     * This involves updating each qsi structure. This operation depends
+     * update all our QSC objects.
+     * This involves updating each qsC structure. This operation depends
      * on the number of running jobs in a queue, and some waiting jobs. If it
      * becomes expensive, do it once every few heartbeats only.
      */ 
-    updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
+    updateContextObjects(mapClusterCapacity, reduceClusterCapacity);
     // make sure we get our map or reduce scheduling object to update its 
-    // collection of QSI objects too. 
+    // collection of QSC objects too.
 
-    if ((maxReduceSlots - currentReduceSlots) > 
-    (maxMapSlots - currentMapSlots)) {
-      // get a reduce task first
-      reduceScheduler.updateCollectionOfQSIs();
+    if (maxReduceSlots > currentReduceSlots) {
+      //reduce slot available , try to get a
+      //reduce task
       tlr = reduceScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
-      }
-      // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                  == tlr.getLookUpStatus() ||
-                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                  == tlr.getLookUpStatus())
-          && (maxMapSlots > currentMapSlots)) {
-        mapScheduler.updateCollectionOfQSIs();
-        tlr = mapScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
+        result.add(tlr.getTask());
       }
     }
-    else {
-      // get a map task first
-      mapScheduler.updateCollectionOfQSIs();
+
+    if(maxMapSlots > currentMapSlots) {
+      //map slot available , try to get a map task
       tlr = mapScheduler.assignTasks(taskTracker);
       if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
         tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
+        result.add(tlr.getTask());
       }
-      // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                    == tlr.getLookUpStatus()
-                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                    == tlr.getLookUpStatus())
-          && (maxReduceSlots > currentReduceSlots)) {
-        reduceScheduler.updateCollectionOfQSIs();
-        tlr = reduceScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
-      }
-    }
-
-    return null;
-  }
-
-  // called when a job is added
-  synchronized void jobAdded(JobInProgress job) throws IOException {
-    QueueSchedulingInfo qsi = 
-      queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update user-specific info
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    if (null == i) {
-      i = 1;
-      // set the count for running tasks to 0
-      qsi.mapTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-      qsi.reduceTSI.numSlotsOccupiedByUser.put(job.getProfile().getUser(),
-          Integer.valueOf(0));
-    }
-    else {
-      i++;
     }
-    qsi.numJobsByUser.put(job.getProfile().getUser(), i);
     
-    // setup scheduler specific job information
-    preInitializeJob(job);
-    
-    LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
-              + job.getProfile().getUser() + ", user now has " + i + " jobs");
+    return (result.isEmpty()) ? null : result;
   }
 
-  /**
-   * Setup {@link CapacityTaskScheduler} specific information prior to
-   * job initialization.
-   */
-  void preInitializeJob(JobInProgress job) {
-    JobConf jobConf = job.getJobConf();
-    
-    // Compute number of slots required to run a single map/reduce task
-    int slotsPerMap = 1;
-    int slotsPerReduce = 1;
-    if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
-      slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
-     slotsPerReduce = 
-       jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
-    }
-    job.setNumSlotsPerMap(slotsPerMap);
-    job.setNumSlotsPerReduce(slotsPerReduce);
-  }
-  
-  // called when a job completes
-  synchronized void jobCompleted(JobInProgress job) {
-    QueueSchedulingInfo qsi = 
-      queueInfoMap.get(job.getProfile().getQueueName());
-    // qsi shouldn't be null
-    // update numJobsByUser
-    LOG.debug("JOb to be removed for user " + job.getProfile().getUser());
-    Integer i = qsi.numJobsByUser.get(job.getProfile().getUser());
-    i--;
-    if (0 == i.intValue()) {
-      qsi.numJobsByUser.remove(job.getProfile().getUser());
-      // remove job footprint from our TSIs
-      qsi.mapTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      qsi.reduceTSI.numSlotsOccupiedByUser.remove(job.getProfile().getUser());
-      LOG.debug("No more jobs for user, number of users = " + qsi.numJobsByUser.size());
-    }
-    else {
-      qsi.numJobsByUser.put(job.getProfile().getUser(), i);
-      LOG.debug("User still has " + i + " jobs, number of users = "
-                + qsi.numJobsByUser.size());
-    }
-  }
   
   @Override
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
     Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
-    Collection<JobInProgress> runningJobs = 
-        jobQueuesManager.getRunningJobQueue(queueName);
+    JobQueue jobQueue = jobQueuesManager.getJobQueue(queueName);
+    if (jobQueue == null) {
+      return jobCollection;
+    }
+    Collection<JobInProgress> runningJobs =
+      jobQueue.getRunningJobs();
     if (runningJobs != null) {
       jobCollection.addAll(runningJobs);
     }
     Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getWaitingJobs(queueName);
+      jobQueue.getWaitingJobs();
     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
       tempCollection.addAll(waitingJobs);
@@ -1433,13 +1015,51 @@
     return initializationPoller;
   }
 
-  synchronized String getDisplayInfo(String queueName) {
-    QueueSchedulingInfo qsi = queueInfoMap.get(queueName);
-    if (null == qsi) { 
+  private synchronized String getDisplayInfo(String queueName) {
+    QueueSchedulingContext qsi = queueInfoMap.get(queueName);
+    if (null == qsi) {
       return null;
     }
     return qsi.toString();
   }
 
-}
+  private synchronized void addToQueueInfoMap(QueueSchedulingContext qsc) {
+    queueInfoMap.put(qsc.getQueueName(), qsc);
+  }
 
+  /**
+   * Create the scheduler information and set it in the {@link QueueManager}.
+   * this should be only called when the scheduler is starting.
+   * 
+   * @param queueManager
+   * @param queueName
+   */
+  private void createDisplayInfo(QueueManager queueManager, String queueName) {
+    if (queueManager != null) {
+      SchedulingDisplayInfo schedulingInfo =
+        new SchedulingDisplayInfo(queueName, this);
+      queueManager.setSchedulerInfo(queueName, schedulingInfo);
+    }
+  }
+
+
+  /**
+   * Use for testing purposes.
+   * returns the root
+   * @return
+   */
+  AbstractQueue getRoot() {
+    return this.root;
+  }
+
+
+  /**
+   * This is used for testing purpose only
+   * Dont use this method.
+   * @param rt
+   */
+  void setRoot(AbstractQueue rt) {
+    this.root = rt;
+  }
+
+}

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Sat Nov 28 20:26:01 2009
@@ -134,7 +134,7 @@
           if (job == null) {
             continue;
           }
-          LOG.info("Initializing job : " + job.getJobID() + " in Queue "
+          LOG.info("Initializing job : " + job.getJobID() + " in AbstractQueue "
               + job.getProfile().getQueueName() + " For user : "
               + job.getProfile().getUser());
           if (startIniting) {
@@ -246,9 +246,9 @@
    */
   private HashMap<String, JobInitializationThread> threadsToQueueMap;
 
-  public JobInitializationPoller(JobQueuesManager mgr,
-      CapacitySchedulerConf rmConf, Set<String> queue, 
-      TaskTrackerManager ttm) {
+  public JobInitializationPoller(
+    JobQueuesManager mgr,
+    TaskTrackerManager ttm) {
     initializedJobs = new HashMap<JobID,JobInProgress>();
     jobQueues = new HashMap<String, QueueInfo>();
     this.jobQueueManager = mgr;
@@ -265,20 +265,7 @@
 
   void init(Set<String> queues, 
             CapacitySchedulerConf capacityConf) {
-    for (String queue : queues) {
-      int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
-      int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);
-      int maxJobsPerUserToInitialize = capacityConf
-          .getMaxJobsPerUserToInitialize(queue);
-      QueueInfo qi = new QueueInfo(queue, maxUsersToInitialize,
-          maxJobsPerUserToInitialize);
-      jobQueues.put(queue, qi);
-    }
-    sleepInterval = capacityConf.getSleepInterval();
-    poolSize = capacityConf.getMaxWorkerThreads();
-    if (poolSize > queues.size()) {
-      poolSize = queues.size();
-    }
+    setupJobInitializerConfiguration(queues, capacityConf);
     assignThreadsToQueues();
     Collection<JobInitializationThread> threads = threadsToQueueMap.values();
     for (JobInitializationThread t : threads) {
@@ -290,6 +277,63 @@
   }
 
   /**
+   * Initialize the configuration of the JobInitializer as well as of the specific
+   * queues.
+   * 
+   * @param queues
+   * @param schedulerConf
+   */
+  private void setupJobInitializerConfiguration(Set<String> queues,
+      CapacitySchedulerConf schedulerConf) {
+    for (String queue : queues) {
+      int maxUsersToInitialize = getMaxUsersToInit(schedulerConf, queue);
+      int maxJobsPerUserToInitialize =
+          schedulerConf.getMaxJobsPerUserToInitialize(queue);
+      QueueInfo qi =
+          new QueueInfo(queue, maxUsersToInitialize,
+              maxJobsPerUserToInitialize);
+      jobQueues.put(queue, qi);
+    }
+    sleepInterval = schedulerConf.getSleepInterval();
+    poolSize = schedulerConf.getMaxWorkerThreads();
+    if (poolSize > queues.size()) {
+      poolSize = queues.size();
+    }
+  }
+
+  /**
+   * 
+   * @param schedulerConf
+   * @param queue
+   * @return
+   */
+  private int getMaxUsersToInit(CapacitySchedulerConf schedulerConf,
+      String queue) {
+    int userlimit = schedulerConf.getMinimumUserLimitPercent(queue);
+    return (100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT;
+  }
+
+  /**
+   * Refresh the Scheduler configuration cached with the initializer. This
+   * should be called only by
+   * {@link CapacityTaskScheduler.CapacitySchedulerQueueRefresher#refreshQueues()}
+   * . The cached configuration currently is only used by the main thread in the
+   * initializer. So, any updates are picked up automatically by subsequent
+   * iterations of the main thread.
+   */
+  void refreshQueueInfo(CapacitySchedulerConf schedulerConf) {
+    for (String queue : jobQueues.keySet()) {
+      QueueInfo queueInfo = jobQueues.get(queue);
+      synchronized (queueInfo) {
+        queueInfo.maxUsersAllowedToInitialize =
+            getMaxUsersToInit(schedulerConf, queue);
+        queueInfo.maxJobsPerUserToInitialize =
+            schedulerConf.getMaxJobsPerUserToInitialize(queue);
+      }
+    }
+  }
+
+  /**
    * This is main thread of initialization poller, We essentially do 
    * following in the main threads:
    * 
@@ -343,7 +387,7 @@
   private void printJobs(ArrayList<JobInProgress> jobsToInitialize) {
     for (JobInProgress job : jobsToInitialize) {
       LOG.info("Passing to Initializer Job Id :" + job.getJobID()
-          + " User: " + job.getProfile().getUser() + " Queue : "
+          + " User: " + job.getProfile().getUser() + " AbstractQueue : "
           + job.getProfile().getQueueName());
     }
   }
@@ -434,13 +478,17 @@
     ArrayList<JobInProgress> jobsToInitialize = new ArrayList<JobInProgress>();
     // use the configuration parameter which is configured for the particular
     // queue.
-    int maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
-    int maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+    int maximumUsersAllowedToInitialize;
+    int maxJobsPerUserAllowedToInitialize;
+    synchronized (qi) {
+      maximumUsersAllowedToInitialize = qi.maxUsersAllowedToInitialize;
+      maxJobsPerUserAllowedToInitialize = qi.maxJobsPerUserToInitialize;
+    }
     int maxJobsPerQueueToInitialize = maximumUsersAllowedToInitialize
         * maxJobsPerUserAllowedToInitialize;
     int countOfJobsInitialized = 0;
     HashMap<String, Integer> userJobsInitialized = new HashMap<String, Integer>();
-    Collection<JobInProgress> jobs = jobQueueManager.getWaitingJobs(queue);
+    Collection<JobInProgress> jobs = jobQueueManager.getJobQueue(queue).getWaitingJobs();
     /*
      * Walk through the collection of waiting jobs.
      *  We maintain a map of jobs that have already been initialized. If a 
@@ -536,7 +584,7 @@
           LOG.info("Removing scheduled jobs from waiting queue"
               + job.getJobID());
           jobsIterator.remove();
-          jobQueueManager.removeJobFromWaitingQueue(job);
+          jobQueueManager.getJobQueue(job).removeWaitingJob(new JobSchedulingInfo(job));
           continue;
         }
       }

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobQueuesManager.java Sat Nov 28 20:26:01 2009
@@ -18,270 +18,85 @@
 package org.apache.hadoop.mapred;
 
 import java.io.IOException;
-import java.util.Collection;
-import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.Map;
-import java.util.TreeMap;
+import java.util.Set;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.mapred.JobQueueJobInProgressListener.JobSchedulingInfo;
-import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 
 /**
  * A {@link JobInProgressListener} that maintains the jobs being managed in
- * one or more queues. 
+ * one or more queues.
  */
 class JobQueuesManager extends JobInProgressListener {
 
-  /* 
-   * If a queue supports priorities, jobs must be 
-   * sorted on priorities, and then on their start times (technically, 
-   * their insertion time.  
-   * If a queue doesn't support priorities, jobs are
-   * sorted based on their start time.  
-   */
-  
-  // comparator for jobs in queues that don't support priorities
-  private static final Comparator<JobSchedulingInfo> STARTTIME_JOB_COMPARATOR
-    = new Comparator<JobSchedulingInfo>() {
-    public int compare(JobSchedulingInfo o1, JobSchedulingInfo o2) {
-      // the job that started earlier wins
-      if (o1.getStartTime() < o2.getStartTime()) {
-        return -1;
-      } else {
-        return (o1.getStartTime() == o2.getStartTime() 
-                ? o1.getJobID().compareTo(o2.getJobID()) 
-                : 1);
-      }
-    }
-  };
-  
-  // class to store queue info
-  private static class QueueInfo {
-
-    // whether the queue supports priorities
-    boolean supportsPriorities;
-    Map<JobSchedulingInfo, JobInProgress> waitingJobs; // for waiting jobs
-    Map<JobSchedulingInfo, JobInProgress> runningJobs; // for running jobs
-    
-    public Comparator<JobSchedulingInfo> comparator;
-    
-    QueueInfo(boolean prio) {
-      this.supportsPriorities = prio;
-      if (supportsPriorities) {
-        // use the default priority-aware comparator
-        comparator = JobQueueJobInProgressListener.FIFO_JOB_QUEUE_COMPARATOR;
-      }
-      else {
-        comparator = STARTTIME_JOB_COMPARATOR;
-      }
-      waitingJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-      runningJobs = new TreeMap<JobSchedulingInfo, JobInProgress>(comparator);
-    }
-    
-    Collection<JobInProgress> getWaitingJobs() {
-      synchronized (waitingJobs) {
-        return Collections.unmodifiableCollection(
-            new LinkedList<JobInProgress>(waitingJobs.values()));
-      }
-    }
-    
-    Collection<JobInProgress> getRunningJobs() {
-      synchronized (runningJobs) {
-       return Collections.unmodifiableCollection(
-           new LinkedList<JobInProgress>(runningJobs.values())); 
-      }
-    }
-    
-    void addRunningJob(JobInProgress job) {
-      synchronized (runningJobs) {
-       runningJobs.put(new JobSchedulingInfo(job),job); 
-      }
-    }
-    
-    JobInProgress removeRunningJob(JobSchedulingInfo jobInfo) {
-      synchronized (runningJobs) {
-        return runningJobs.remove(jobInfo); 
-      }
-    }
-    
-    JobInProgress removeWaitingJob(JobSchedulingInfo schedInfo) {
-      synchronized (waitingJobs) {
-        return waitingJobs.remove(schedInfo);
-      }
-    }
-    
-    void addWaitingJob(JobInProgress job) {
-      synchronized (waitingJobs) {
-        waitingJobs.put(new JobSchedulingInfo(job), job);
-      }
-    }
-    
-    int getWaitingJobCount() {
-      synchronized (waitingJobs) {
-       return waitingJobs.size(); 
-      }
-    }
-    
-  }
-  
   // we maintain a hashmap of queue-names to queue info
-  private Map<String, QueueInfo> jobQueues = 
-    new HashMap<String, QueueInfo>();
+  private Map<String, JobQueue> jobQueues =
+    new HashMap<String, JobQueue>();
   private static final Log LOG = LogFactory.getLog(JobQueuesManager.class);
-  private CapacityTaskScheduler scheduler;
 
-  
-  JobQueuesManager(CapacityTaskScheduler s) {
-    this.scheduler = s;
-  }
-  
-  /**
-   * create an empty queue with the default comparator
-   * @param queueName The name of the queue
-   * @param supportsPriotities whether the queue supports priorities
-   */
-  public void createQueue(String queueName, boolean supportsPriotities) {
-    jobQueues.put(queueName, new QueueInfo(supportsPriotities));
-  }
-  
-  /**
-   * Returns the queue of running jobs associated with the name
-   */
-  public Collection<JobInProgress> getRunningJobQueue(String queueName) {
-    return jobQueues.get(queueName).getRunningJobs();
+
+  JobQueuesManager() {
   }
-  
+
   /**
-   * Returns the queue of waiting jobs associated with queue name.
+   * Add the given queue to the map of queue name to job-queues.
    * 
+   * @param queue The job-queue
    */
-  Collection<JobInProgress> getWaitingJobs(String queueName) {
-    return jobQueues.get(queueName).getWaitingJobs();
+  public void addQueue(JobQueue queue) {
+    jobQueues.put(queue.getName(),queue);
   }
-  
+
   @Override
   public void jobAdded(JobInProgress job) throws IOException {
-    LOG.info("Job submitted to queue " + job.getProfile().getQueueName());
+    LOG.info("Job " + job.getJobID() + " submitted to queue "
+        + job.getProfile().getQueueName());
     // add job to the right queue
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
+    JobQueue qi = getJobQueue(job.getProfile().getQueueName());
     if (null == qi) {
       // job was submitted to a queue we're not aware of
-      LOG.warn("Invalid queue " + job.getProfile().getQueueName() + 
-          " specified for job" + job.getProfile().getJobID() + 
+      LOG.warn(
+        "Invalid queue " + job.getProfile().getQueueName() +
+          " specified for job " + job.getProfile().getJobID() +
           ". Ignoring job.");
       return;
     }
-    // add job to waiting queue. It will end up in the right place, 
-    // based on priority. 
-    qi.addWaitingJob(job);
     // let scheduler know. 
-    scheduler.jobAdded(job);
+    qi.jobAdded(job);
   }
 
-  /*
-   * Method removes the jobs from both running and waiting job queue in 
-   * job queue manager.
-   */
-  private void jobCompleted(JobInProgress job, JobSchedulingInfo oldInfo, 
-                            QueueInfo qi) {
-    LOG.info("Job " + job.getJobID().toString() + " submitted to queue " 
-        + job.getProfile().getQueueName() + " has completed");
-    //remove jobs from both queue's a job can be in
-    //running and waiting queue at the same time.
-    qi.removeRunningJob(oldInfo);
-    qi.removeWaitingJob(oldInfo);
-    // let scheduler know
-    scheduler.jobCompleted(job);
-  }
-  
   // Note that job is removed when the job completes i.e in jobUpated()
   @Override
-  public void jobRemoved(JobInProgress job) {}
-  
-  // This is used to reposition a job in the queue. A job can get repositioned 
-  // because of the change in the job priority or job start-time.
-  private void reorderJobs(JobInProgress job, JobSchedulingInfo oldInfo, 
-                           QueueInfo qi) {
-    
-    if(qi.removeWaitingJob(oldInfo) != null) {
-      qi.addWaitingJob(job);
-    }
-    if(qi.removeRunningJob(oldInfo) != null) {
-      qi.addRunningJob(job);
-    }
-  }
-  
-  // This is used to move a job from the waiting queue to the running queue.
-  private void makeJobRunning(JobInProgress job, JobSchedulingInfo oldInfo, 
-                              QueueInfo qi) {
-    // Removing of the job from job list is responsibility of the
-    //initialization poller.
-    // Add the job to the running queue
-    qi.addRunningJob(job);
-  }
-  
-  // Update the scheduler as job's state has changed
-  private void jobStateChanged(JobStatusChangeEvent event, QueueInfo qi) {
-    JobInProgress job = event.getJobInProgress();
-    JobSchedulingInfo oldJobStateInfo = 
-      new JobSchedulingInfo(event.getOldStatus());
-    // Check if the ordering of the job has changed
-    // For now priority and start-time can change the job ordering
-    if (event.getEventType() == EventType.PRIORITY_CHANGED 
-        || event.getEventType() == EventType.START_TIME_CHANGED) {
-      // Make a priority change
-      reorderJobs(job, oldJobStateInfo, qi);
-    } else if (event.getEventType() == EventType.RUN_STATE_CHANGED) {
-      // Check if the job is complete
-      int runState = job.getStatus().getRunState();
-      if (runState == JobStatus.SUCCEEDED
-          || runState == JobStatus.FAILED
-          || runState == JobStatus.KILLED) {
-        jobCompleted(job, oldJobStateInfo, qi);
-      } else if (runState == JobStatus.RUNNING) {
-        makeJobRunning(job, oldJobStateInfo, qi);
-      }
-    }
+  public void jobRemoved(JobInProgress job) {
   }
-  
+
+
   @Override
   public void jobUpdated(JobChangeEvent event) {
     JobInProgress job = event.getJobInProgress();
-    QueueInfo qi = jobQueues.get(job.getProfile().getQueueName());
-    if (null == qi) {
-      // can't find queue for job. Shouldn't happen. 
-      LOG.warn("Could not find queue " + job.getProfile().getQueueName() + 
-          " when updating job " + job.getProfile().getJobID());
-      return;
-    }
-    
-    // Check if this is the status change
-    if (event instanceof JobStatusChangeEvent) {
-      jobStateChanged((JobStatusChangeEvent)event, qi);
-    }
-  }
-  
-  void removeJobFromWaitingQueue(JobInProgress job) {
-    String queue = job.getProfile().getQueueName();
-    QueueInfo qi = jobQueues.get(queue);
-    qi.removeWaitingJob(new JobSchedulingInfo(job));
+    JobQueue qi = getJobQueue(job.getProfile().getQueueName());
+    qi.jobUpdated(event);
+
   }
-  
+
   Comparator<JobSchedulingInfo> getComparator(String queue) {
-    return jobQueues.get(queue).comparator;
+    return getJobQueue(queue).comparator;
   }
-  
-  int getWaitingJobCount(String queue) {
-    QueueInfo qi = jobQueues.get(queue);
-    return qi.getWaitingJobCount();
+
+
+  public JobQueue getJobQueue(JobInProgress jip){
+    return getJobQueue(jip.getProfile().getQueueName());   
+  }
+
+  JobQueue getJobQueue(String name) {
+    return jobQueues.get(name);
   }
 
-  boolean doesQueueSupportPriorities(String queueName) {
-    return jobQueues.get(queueName).supportsPriorities;
+  public Set<String> getJobQueueNames() {
+    return jobQueues.keySet();
   }
 }