You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC

svn commit: r885145 [6/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./ .eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/ src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/ src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...

Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Sat Nov 28 20:26:01 2009
@@ -18,759 +18,38 @@
 
 package org.apache.hadoop.mapred;
 
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.TreeMap;
-
 import junit.framework.TestCase;
-
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-
-import org.apache.hadoop.io.BytesWritable;
-import org.apache.hadoop.mapred.JobInProgress;
-import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.TaskType;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.SecurityUtil.AccessControlList;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import static org.apache.hadoop.mapred.CapacityTestUtils.*;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.*;
 
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
-      LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
-
-  private static int jobCounter;
+    LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
 
-  /**
-   * Test class that removes the asynchronous nature of job initialization.
-   * 
-   * The run method is a dummy which just waits for completion. It is
-   * expected that test code calls the main method, initializeJobs, directly
-   * to trigger initialization.
-   */
-  class ControlledJobInitializer extends 
-                              JobInitializationPoller.JobInitializationThread {
-    
-    boolean stopRunning;
-    
-    public ControlledJobInitializer(JobInitializationPoller p) {
-      p.super();
-    }
-    
-    @Override
-    public void run() {
-      while (!stopRunning) {
-        try {
-          synchronized(this) {
-            this.wait();  
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    }
-    
-    void stopRunning() {
-      stopRunning = true;
-    }
-  }
-  
-  /**
-   * Test class that removes the asynchronous nature of job initialization.
-   * 
-   * The run method is a dummy which just waits for completion. It is
-   * expected that test code calls the main method, selectJobsToInitialize,
-   * directly to trigger initialization.
-   * 
-   * The class also creates the test worker thread objects of type 
-   * ControlledJobInitializer instead of the objects of the actual class
-   */
-  class ControlledInitializationPoller extends JobInitializationPoller {
-    
-    private boolean stopRunning;
-    private ArrayList<ControlledJobInitializer> workers;
-    
-    public ControlledInitializationPoller(JobQueuesManager mgr,
-                                          CapacitySchedulerConf rmConf,
-                                          Set<String> queues,
-                                          TaskTrackerManager ttm) {
-      super(mgr, rmConf, queues, ttm);
-    }
-    
-    @Override
-    public void run() {
-      // don't do anything here.
-      while (!stopRunning) {
-        try {
-          synchronized (this) {
-            this.wait();
-          }
-        } catch (InterruptedException ie) {
-          break;
-        }
-      }
-    }
-    
-    @Override
-    JobInitializationThread createJobInitializationThread() {
-      ControlledJobInitializer t = new ControlledJobInitializer(this);
-      if (workers == null) {
-        workers = new ArrayList<ControlledJobInitializer>();
-      }
-      workers.add(t);
-      return t;
-    }
+  String queueConfigPath =
+    System.getProperty("test.build.extraconf", "build/test/extraconf");
+  File queueConfigFile =
+    new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
 
-    @Override
-    void selectJobsToInitialize() {
-      super.cleanUpInitializedJobsList();
-      super.selectJobsToInitialize();
-      for (ControlledJobInitializer t : workers) {
-        t.initializeJobs();
-      }
-    }
-    
-    void stopRunning() {
-      stopRunning = true;
-      for (ControlledJobInitializer t : workers) {
-        t.stopRunning();
-        t.interrupt();
-      }
-    }
-  }
+  private static int jobCounter;
 
   private ControlledInitializationPoller controlledInitializationPoller;
-  /*
-   * Fake job in progress object used for testing the schedulers scheduling
-   * decisions. The JobInProgress objects returns out FakeTaskInProgress
-   * objects when assignTasks is called. If speculative maps and reduces
-   * are configured then JobInProgress returns exactly one Speculative
-   * map and reduce task.
-   */
-  static class FakeJobInProgress extends JobInProgress {
-    
-    protected FakeTaskTrackerManager taskTrackerManager;
-    private int mapTaskCtr;
-    private int redTaskCtr;
-    private Set<TaskInProgress> mapTips = 
-      new HashSet<TaskInProgress>();
-    private Set<TaskInProgress> reduceTips = 
-      new HashSet<TaskInProgress>();
-    private int speculativeMapTaskCounter = 0;
-    private int speculativeReduceTaskCounter = 0;
-    public FakeJobInProgress(JobID jId, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(jId, jobConf, null);
-      this.taskTrackerManager = taskTrackerManager;
-      this.startTime = System.currentTimeMillis();
-      this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP, 
-          jobConf.getUser(), 
-          jobConf.getJobName(), "", "");
-      this.status.setJobPriority(JobPriority.NORMAL);
-      this.status.setStartTime(startTime);
-      if (null == jobConf.getQueueName()) {
-        this.profile = new JobProfile(user, jId, 
-            null, null, null);
-      }
-      else {
-        this.profile = new JobProfile(user, jId, 
-            null, null, null, jobConf.getQueueName());
-      }
-      mapTaskCtr = 0;
-      redTaskCtr = 0;
-    }
-    
-    @Override
-    public synchronized void initTasks() throws IOException {
-      getStatus().setRunState(JobStatus.RUNNING);
-    }
-
-    @Override
-    public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
-        int ignored) throws IOException {
-      boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
-      if (areAllMapsRunning){
-        if(!getJobConf().getMapSpeculativeExecution() || 
-            speculativeMapTasks > 0) {
-          return null;
-        }
-      }
-      TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 
-                                super.numSlotsPerMap) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
-        }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningMapTasks++;
-      // create a fake TIP and keep track of it
-      FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, true, this);
-      mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if(areAllMapsRunning) {
-        speculativeMapTasks++;
-        //you have scheduled a speculative map. Now set all tips in the
-        //map tips not to have speculative task.
-        for(TaskInProgress t : mapTips) {
-          if (t instanceof FakeTaskInProgress) {
-            FakeTaskInProgress mt = (FakeTaskInProgress) t;
-            mt.hasSpeculativeMap = false;
-          }
-        }
-      } else {
-        //add only non-speculative tips.
-        mapTips.add(mapTip);
-        //add the tips to the JobInProgress TIPS
-        maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
-      }
-      return task;
-    }
-
-    @Override
-    public Task obtainNewReduceTask(final TaskTrackerStatus tts,
-        int clusterSize, int ignored) throws IOException {
-      boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
-      if (areAllReducesRunning){
-        if(!getJobConf().getReduceSpeculativeExecution() || 
-            speculativeReduceTasks > 0) {
-          return null;
-        }
-      }
-      TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10, 
-                        super.numSlotsPerReduce) {
-        @Override
-        public String toString() {
-          return String.format("%s on %s", getTaskID(), tts.getTrackerName());
-        }
-      };
-      taskTrackerManager.startTask(tts.getTrackerName(), task);
-      runningReduceTasks++;
-      // create a fake TIP and keep track of it
-      FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, false, this);
-      reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if(areAllReducesRunning) {
-        speculativeReduceTasks++;
-        //you have scheduled a speculative map. Now set all tips in the
-        //map tips not to have speculative task.
-        for(TaskInProgress t : reduceTips) {
-          if (t instanceof FakeTaskInProgress) {
-            FakeTaskInProgress rt = (FakeTaskInProgress) t;
-            rt.hasSpeculativeReduce = false;
-          }
-        }
-      } else {
-        //add only non-speculative tips.
-        reduceTips.add(reduceTip);
-        //add the tips to the JobInProgress TIPS
-        reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
-      }
-      return task;
-    }
-    
-    public void mapTaskFinished() {
-      runningMapTasks--;
-      finishedMapTasks++;
-    }
-    
-    public void reduceTaskFinished() {
-      runningReduceTasks--;
-      finishedReduceTasks++;
-    }
-    
-    private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
-      JobID jobId = getJobID();
-      TaskType t = TaskType.REDUCE;
-      if (isMap) {
-        t = TaskType.MAP;
-      }
-      if (!isSpeculative) {
-        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
-            (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
-      } else  {
-        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
-            (isMap) ? mapTaskCtr : redTaskCtr, 1);
-      }
-    }
-    
-    @Override
-    Set<TaskInProgress> getNonLocalRunningMaps() {
-      return (Set<TaskInProgress>)mapTips;
-    }
-    @Override
-    Set<TaskInProgress> getRunningReduces() {
-      return (Set<TaskInProgress>)reduceTips;
-    }
-    
-  }
-  
-  static class FakeFailingJobInProgress extends FakeJobInProgress {
-
-    public FakeFailingJobInProgress(JobID id, JobConf jobConf,
-        FakeTaskTrackerManager taskTrackerManager, String user) {
-      super(id, jobConf, taskTrackerManager, user);
-    }
-    
-    @Override
-    public synchronized void initTasks() throws IOException {
-      throw new IOException("Failed Initalization");
-    }
-    
-    @Override
-    synchronized void fail() {
-      this.status.setRunState(JobStatus.FAILED);
-    }
-  }
- 
-  static class FakeTaskInProgress extends TaskInProgress {
-    private boolean isMap;
-    private FakeJobInProgress fakeJob;
-    private TreeMap<TaskAttemptID, String> activeTasks;
-    private TaskStatus taskStatus;
-    boolean hasSpeculativeMap;
-    boolean hasSpeculativeReduce;
-    
-    FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
-        boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
-      this.isMap = isMap;
-      this.fakeJob = job;
-      activeTasks = new TreeMap<TaskAttemptID, String>();
-      activeTasks.put(t.getTaskID(), "tt");
-      // create a fake status for a task that is running for a bit
-      this.taskStatus = TaskStatus.createTaskStatus(isMap);
-      taskStatus.setProgress(0.5f);
-      taskStatus.setRunState(TaskStatus.State.RUNNING);
-      if (jobConf.getMapSpeculativeExecution()) {
-        //resetting of the hasSpeculativeMap is done
-        //when speculative map is scheduled by the job.
-        hasSpeculativeMap = true;
-      } 
-      if (jobConf.getReduceSpeculativeExecution()) {
-        //resetting of the hasSpeculativeReduce is done
-        //when speculative reduce is scheduled by the job.
-        hasSpeculativeReduce = true;
-      }
-    }
-    
-    @Override
-    TreeMap<TaskAttemptID, String> getActiveTasks() {
-      return activeTasks;
-    }
-    @Override
-    public TaskStatus getTaskStatus(TaskAttemptID taskid) {
-      // return a status for a task that has run a bit
-      return taskStatus;
-    }
-    @Override
-    boolean killTask(TaskAttemptID taskId, boolean shouldFail) {
-      if (isMap) {
-        fakeJob.mapTaskFinished();
-      }
-      else {
-        fakeJob.reduceTaskFinished();
-      }
-      return true;
-    }
-    
-    @Override
-    /*
-     *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
-     *after the speculative tip has been scheduled.
-     */
-    boolean canBeSpeculated(long currentTime) {
-      if(isMap && hasSpeculativeMap) {
-        return fakeJob.getJobConf().getMapSpeculativeExecution();
-      } 
-      if (!isMap && hasSpeculativeReduce) {
-        return fakeJob.getJobConf().getReduceSpeculativeExecution();
-      }
-      return false;
-    }
-    
-    @Override
-    public boolean isRunning() {
-      return !activeTasks.isEmpty();
-    }
-    
-  }
-  
-  static class FakeQueueManager extends QueueManager {
-    private Set<String> queueNames = null;
-    private static final AccessControlList allEnabledAcl = new AccessControlList("*");
-    
-    FakeQueueManager() {
-      super(new Configuration());
-    }
-    
-    void setQueues(Set<String> queueNames) {
-      this.queueNames = queueNames;
-
-      // sync up queues with the parent class.
-      Queue[] queues = new Queue[queueNames.size()];
-      int i = 0;
-      for (String queueName : queueNames) {
-        HashMap<String, AccessControlList> aclsMap
-          = new HashMap<String, AccessControlList>();
-        for (Queue.QueueOperation oper : Queue.QueueOperation.values()) {
-          String key = QueueManager.toFullPropertyName(queueName,
-                                                        oper.getAclName());
-          aclsMap.put(key, allEnabledAcl);
-        }
-        queues[i++] = new Queue(queueName, aclsMap, Queue.QueueState.RUNNING);
-      }
-      super.setQueues(queues);
-    }
-    
-    public synchronized Set<String> getQueues() {
-      return queueNames;
-    }
-  }
-  
-  static class FakeTaskTrackerManager implements TaskTrackerManager {
-    int maps = 0;
-    int reduces = 0;
-    int maxMapTasksPerTracker = 2;
-    int maxReduceTasksPerTracker = 1;
-    long ttExpiryInterval = 10 * 60 * 1000L; // default interval
-    List<JobInProgressListener> listeners =
-      new ArrayList<JobInProgressListener>();
-    FakeQueueManager qm = new FakeQueueManager();
-    
-    private Map<String, TaskTracker> trackers =
-      new HashMap<String, TaskTracker>();
-    private Map<String, TaskStatus> taskStatuses = 
-      new HashMap<String, TaskStatus>();
-    private Map<JobID, JobInProgress> jobs =
-        new HashMap<JobID, JobInProgress>();
-
-    public FakeTaskTrackerManager() {
-      this(2, 2, 1);
-    }
-
-    public FakeTaskTrackerManager(int numTaskTrackers,
-        int maxMapTasksPerTracker, int maxReduceTasksPerTracker) {
-      this.maxMapTasksPerTracker = maxMapTasksPerTracker;
-      this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
-      for (int i = 1; i < numTaskTrackers + 1; i++) {
-        String ttName = "tt" + i;
-        TaskTracker tt = new TaskTracker(ttName);
-        tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
-                                           new ArrayList<TaskStatus>(), 0, 
-                                           maxMapTasksPerTracker,
-                                           maxReduceTasksPerTracker));
-        trackers.put(ttName, tt);
-      }
-    }
-    
-    public void addTaskTracker(String ttName) {
-      TaskTracker tt = new TaskTracker(ttName);
-      tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
-                                         new ArrayList<TaskStatus>(), 0,
-                                         maxMapTasksPerTracker, 
-                                         maxReduceTasksPerTracker));
-      trackers.put(ttName, tt);
-    }
-    
-    public ClusterStatus getClusterStatus() {
-      int numTrackers = trackers.size();
-      return new ClusterStatus(numTrackers, 0,
-          ttExpiryInterval, maps, reduces,
-          numTrackers * maxMapTasksPerTracker,
-          numTrackers * maxReduceTasksPerTracker,
-          JobTracker.State.RUNNING);
-    }
-
-    public int getNumberOfUniqueHosts() {
-      return 0;
-    }
-
-    public int getNextHeartbeatInterval() {
-      return MRConstants.HEARTBEAT_INTERVAL_MIN;
-    }
-
-    @Override
-    public void killJob(JobID jobid) throws IOException {
-      JobInProgress job = jobs.get(jobid);
-      finalizeJob(job, JobStatus.KILLED);
-      job.kill();
-    }
-
-    public void initJob(JobInProgress jip) {
-      try {
-        JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-        jip.initTasks();
-        if (jip.isJobEmpty()) {
-          completeEmptyJob(jip);
-        } else if (!jip.isSetupCleanupRequired()) {
-          jip.completeSetup();
-        }
-        JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-        JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
-          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-         for (JobInProgressListener listener : listeners) {
-           listener.jobUpdated(event);
-         }
-      } catch (Exception ioe) {
-        failJob(jip);
-      }
-    }
-
-    private synchronized void completeEmptyJob(JobInProgress jip) {
-      jip.completeEmptyJob();
-    }
-
-    public synchronized void failJob(JobInProgress jip) {
-      JobStatus oldStatus = (JobStatus)jip.getStatus().clone();
-      jip.fail();
-      JobStatus newStatus = (JobStatus)jip.getStatus().clone();
-      JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
-          EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-
-    public void removeJob(JobID jobid) {
-      jobs.remove(jobid);
-    }
-    
-    @Override
-    public JobInProgress getJob(JobID jobid) {
-      return jobs.get(jobid);
-    }
-
-    Collection<JobInProgress> getJobs() {
-      return jobs.values();
-    }
-
-    public Collection<TaskTrackerStatus> taskTrackers() {
-      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
-      for (TaskTracker tt : trackers.values()) {
-        statuses.add(tt.getStatus());
-      }
-      return statuses;
-    }
-
-
-    public void addJobInProgressListener(JobInProgressListener listener) {
-      listeners.add(listener);
-    }
-
-    public void removeJobInProgressListener(JobInProgressListener listener) {
-      listeners.remove(listener);
-    }
-    
-    public void submitJob(JobInProgress job) throws IOException {
-      jobs.put(job.getJobID(), job);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobAdded(job);
-      }
-    }
-    
-    public TaskTracker getTaskTracker(String trackerID) {
-      return trackers.get(trackerID);
-    }
-    
-    public void startTask(String taskTrackerName, final Task t) {
-      if (t.isMapTask()) {
-        maps++;
-      } else {
-        reduces++;
-      }
-      TaskStatus status = new TaskStatus() {
-        @Override
-        public TaskAttemptID getTaskID() {
-          return t.getTaskID();
-        }
-
-        @Override
-        public boolean getIsMap() {
-          return t.isMapTask();
-        }
-        
-        @Override
-        public int getNumSlots() {
-          return t.getNumSlotsRequired();
-        }
-      };
-      taskStatuses.put(t.getTaskID().toString(), status);
-      status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
-    }
-    
-    public void finishTask(String taskTrackerName, String tipId, 
-        FakeJobInProgress j) {
-      TaskStatus status = taskStatuses.get(tipId);
-      if (status.getIsMap()) {
-        maps--;
-        j.mapTaskFinished();
-      } else {
-        reduces--;
-        j.reduceTaskFinished();
-      }
-      status.setRunState(TaskStatus.State.SUCCEEDED);
-    }
-    
-    void finalizeJob(FakeJobInProgress fjob) {
-      finalizeJob(fjob, JobStatus.SUCCEEDED);
-    }
-
-    void finalizeJob(JobInProgress fjob, int state) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.getStatus().setRunState(state);
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    public void setPriority(FakeJobInProgress fjob, JobPriority priority) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      fjob.setPriority(priority);
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus, 
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    public void setStartTime(FakeJobInProgress fjob, long start) {
-      // take a snapshot of the status before changing it
-      JobStatus oldStatus = (JobStatus)fjob.getStatus().clone();
-      
-      fjob.startTime = start; // change the start time of the job
-      fjob.status.setStartTime(start); // change the start time of the jobstatus
-      
-      JobStatus newStatus = (JobStatus)fjob.getStatus().clone();
-      
-      JobStatusChangeEvent event = 
-        new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
-                                  newStatus);
-      for (JobInProgressListener listener : listeners) {
-        listener.jobUpdated(event);
-      }
-    }
-    
-    void addQueues(String[] arr) {
-      Set<String> queues = new HashSet<String>();
-      for (String s: arr) {
-        queues.add(s);
-      }
-      qm.setQueues(queues);
-    }
-    
-    public QueueManager getQueueManager() {
-      return qm;
-    }
-
-    @Override
-    public boolean killTask(TaskAttemptID taskid, boolean shouldFail) {
-      return true;
-    }
-  }
-  
-  // represents a fake queue configuration info
-  static class FakeQueueInfo {
-    String queueName;
-    float capacity;
-    boolean supportsPrio;
-    int ulMin;
-
-    public FakeQueueInfo(String queueName, float capacity, boolean supportsPrio, int ulMin) {
-      this.queueName = queueName;
-      this.capacity = capacity;
-      this.supportsPrio = supportsPrio;
-      this.ulMin = ulMin;
-    }
-  }
-  
-  static class FakeResourceManagerConf extends CapacitySchedulerConf {
-  
-    // map of queue names to queue info
-    private Map<String, FakeQueueInfo> queueMap = 
-      new LinkedHashMap<String, FakeQueueInfo>();
-    String firstQueue;
-    
-    
-    void setFakeQueues(List<FakeQueueInfo> queues) {
-      for (FakeQueueInfo q: queues) {
-        queueMap.put(q.queueName, q);
-      }
-      firstQueue = new String(queues.get(0).queueName);
-    }
-    
-    public synchronized Set<String> getQueues() {
-      return queueMap.keySet();
-    }
-    
-    /*public synchronized String getFirstQueue() {
-      return firstQueue;
-    }*/
-    
-    public float getCapacity(String queue) {
-      if(queueMap.get(queue).capacity == -1) {
-        return super.getCapacity(queue);
-      }
-      return queueMap.get(queue).capacity;
-    }
-    
-    public int getMinimumUserLimitPercent(String queue) {
-      return queueMap.get(queue).ulMin;
-    }
-    
-    public boolean isPrioritySupported(String queue) {
-      return queueMap.get(queue).supportsPrio;
-    }
-    
-    @Override
-    public long getSleepInterval() {
-      return 1;
-    }
-    
-    @Override
-    public int getMaxWorkerThreads() {
-      return 1;
-    }
-  }
-
-  protected class FakeClock extends CapacityTaskScheduler.Clock {
-    private long time = 0;
-    
-    public void advance(long millis) {
-      time += millis;
-    }
 
-    @Override
-    long getTime() {
-      return time;
-    }
-  }
 
-  
   protected JobConf conf;
   protected CapacityTaskScheduler scheduler;
   private FakeTaskTrackerManager taskTrackerManager;
-  private FakeResourceManagerConf resConf;
   private FakeClock clock;
 
   @Override
@@ -778,30 +57,31 @@
     setUp(2, 2, 1);
   }
 
-  private void setUp(int numTaskTrackers, int numMapTasksPerTracker,
-      int numReduceTasksPerTracker) {
+  private void setUp(
+    int numTaskTrackers, int numMapTasksPerTracker,
+    int numReduceTasksPerTracker) {
     jobCounter = 0;
     taskTrackerManager =
-        new FakeTaskTrackerManager(numTaskTrackers, numMapTasksPerTracker,
-            numReduceTasksPerTracker);
+      new FakeTaskTrackerManager(
+        numTaskTrackers, numMapTasksPerTracker,
+        numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
     scheduler.setTaskTrackerManager(taskTrackerManager);
 
     conf = new JobConf();
     // Don't let the JobInitializationPoller come in our way.
-    resConf = new FakeResourceManagerConf();
-    controlledInitializationPoller = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues(), taskTrackerManager);
+    conf.set("mapred.queue.names","default");
+    controlledInitializationPoller =
+        new ControlledInitializationPoller(scheduler.jobQueuesManager,
+            taskTrackerManager);
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
     //by default disable speculative execution.
     conf.setMapSpeculativeExecution(false);
     conf.setReduceSpeculativeExecution(false);
   }
-  
+
   @Override
   protected void tearDown() throws Exception {
     if (scheduler != null) {
@@ -809,377 +89,366 @@
     }
   }
 
-  private FakeJobInProgress submitJob(int state, JobConf jobConf) throws IOException {
-    FakeJobInProgress job =
-        new FakeJobInProgress(new JobID("test", ++jobCounter),
-            (jobConf == null ? new JobConf(conf) : jobConf), taskTrackerManager,
-            jobConf.getUser());
-    job.getStatus().setRunState(state);
-    taskTrackerManager.submitJob(job);
-    return job;
-  }
-
-  private FakeJobInProgress submitJobAndInit(int state, JobConf jobConf)
-      throws IOException {
-    FakeJobInProgress j = submitJob(state, jobConf);
-    taskTrackerManager.initJob(j);
-    return j;
-  }
-
-  private FakeJobInProgress submitJob(int state, int maps, int reduces, 
-      String queue, String user) throws IOException {
-    JobConf jobConf = new JobConf(conf);
-    jobConf.setNumMapTasks(maps);
-    jobConf.setNumReduceTasks(reduces);
-    if (queue != null)
-      jobConf.setQueueName(queue);
-    jobConf.setUser(user);
-    return submitJob(state, jobConf);
-  }
-  
-  // Submit a job and update the listeners
-  private FakeJobInProgress submitJobAndInit(int state, int maps, int reduces,
-                                             String queue, String user) 
-  throws IOException {
-    FakeJobInProgress j = submitJob(state, maps, reduces, queue, user);
-    taskTrackerManager.initJob(j);
-    return j;
-  }
-
-  /**
-   * Test the max map limit.
-   * @throws IOException
-   */
-  public void testMaxMapCap() throws IOException {
-    this.setUp(4,1,1);
-    taskTrackerManager.addQueues(new String[] {"default"});
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default",2);
-    resConf.setMaxReduceCap("default",-1);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-
-    //submit the Job
-    FakeJobInProgress fjob1 =
-      submitJob(JobStatus.PREP,3,1,"default","user");
-
-    taskTrackerManager.initJob(fjob1);
-
-    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
-    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
-    //Once the 2 tasks are running the third assigment should be reduce.
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
-    //This should fail.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
-    //Now complete the task 1.
-        // complete the job
-    taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
-                                  fjob1);
-    //We have completed the tt1 task which was a map task so we expect one map
-    //task to be picked up
-    checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
-  }
-
   /**
-   * Test max reduce limit
+   * Test max capacity
    * @throws IOException
    */
-  public void testMaxReduceCap() throws IOException {
+  public void testMaxCapacity() throws IOException {
     this.setUp(4, 1, 1);
     taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", -1);
-    resConf.setMaxReduceCap("default", 2);
-    scheduler.setResourceManagerConf(resConf);
+    queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .setMaxCapacityPercent(50.0f);
 
     //submit the Job
-    FakeJobInProgress fjob1 =
-      submitJob(JobStatus.PREP, 1, 3, "default", "user");
+    FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+      JobStatus.PREP, 4, 4, "default", "user");
 
     taskTrackerManager.initJob(fjob1);
+    HashMap<String, String> expectedStrings = new HashMap<String, String>();
 
-    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
-    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-    List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+    expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> task1 = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1", expectedStrings);
 
-    //This should fail. 1 map, 2 reduces , we have reached the limit.
-    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
-    //Now complete the task 1 i.e map task.
-    // complete the job
-    taskTrackerManager.finishTask(
-      "tt1", task1.get(0).getTaskID().toString(),
-      fjob1);
 
-    //This should still fail as only map task is done
-    task4 = scheduler.assignTasks(tracker("tt4"));
-    assertNull(task4);
+    expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+    List<Task> task2 = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2", expectedStrings);
 
-    //Complete the reduce task
-    taskTrackerManager.finishTask(
-      "tt2", task2.get(0).getTaskID().toString(), fjob1);
+    //we have already reached the limit
+    //this call would return null
+    List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+    assertNull(task3);
 
-    //One reduce is done hence assign the new reduce.
-    checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+    //Now complete the task 1 i.e map task.
+    for (Task task : task1) {
+        taskTrackerManager.finishTask(
+          task.getTaskID().toString(), fjob1);
+    }
+    
+    expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt1");
+    expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt1");
+    task2 = checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1", expectedStrings);
   }
-  
+
   // test job run-state change
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
-    taskTrackerManager.addQueues(new String[] {"default"});
+    taskTrackerManager.addQueues(new String[]{"default"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
-    
+
     // submit the job
-    FakeJobInProgress fjob1 = 
-      submitJob(JobStatus.PREP, 1, 0, "default", "user");
-    
-    FakeJobInProgress fjob2 = 
-      submitJob(JobStatus.PREP, 1, 0, "default", "user");
-    
+    FakeJobInProgress fjob1 =
+      taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
+    FakeJobInProgress fjob2 =
+      taskTrackerManager.submitJob(JobStatus.PREP, 1, 0, "default", "user");
+
     // test if changing the job priority/start-time works as expected in the 
     // waiting queue
     testJobOrderChange(fjob1, fjob2, true);
-    
+
     // Init the jobs
     // simulate the case where the job with a lower priority becomes running 
     // first (may be because of the setup tasks).
-    
+
     // init the lower ranked job first
     taskTrackerManager.initJob(fjob2);
-    
+
     // init the higher ordered job later
     taskTrackerManager.initJob(fjob1);
-    
+
     // check if the jobs are missing from the waiting queue
     // The jobs are not removed from waiting queue until they are scheduled 
-    assertEquals("Waiting queue is garbled on job init", 2, 
-                 scheduler.jobQueuesManager.getWaitingJobs("default")
-                          .size());
-    
+    assertEquals(
+      "Waiting queue is garbled on job init", 2,
+      scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+        .size());
+
     // test if changing the job priority/start-time works as expected in the 
     // running queue
     testJobOrderChange(fjob1, fjob2, false);
-    
+
     // schedule a task
     List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-    
+
     // complete the job
-    taskTrackerManager.finishTask("tt1", tasks.get(0).getTaskID().toString(), 
-                                  fjob1);
-    
+    taskTrackerManager.finishTask(
+      tasks.get(0).getTaskID().toString(),
+      fjob1);
+
     // mark the job as complete
     taskTrackerManager.finalizeJob(fjob1);
-    
-    Collection<JobInProgress> rqueue = 
-      scheduler.jobQueuesManager.getRunningJobQueue("default");
-    
+
+    Collection<JobInProgress> rqueue =
+      scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
+
     // check if the job is removed from the scheduler
-    assertFalse("Scheduler contains completed job", 
-                rqueue.contains(fjob1));
-    
+    assertFalse(
+      "Scheduler contains completed job",
+      rqueue.contains(fjob1));
+
     // check if the running queue size is correct
-    assertEquals("Job finish garbles the queue", 
-                 1, rqueue.size());
+    assertEquals(
+      "Job finish garbles the queue",
+      1, rqueue.size());
 
   }
-  
+
   // test if the queue reflects the changes
-  private void testJobOrderChange(FakeJobInProgress fjob1, 
-                                  FakeJobInProgress fjob2, 
-                                  boolean waiting) {
+  private void testJobOrderChange(
+    FakeJobInProgress fjob1,
+    FakeJobInProgress fjob2,
+    boolean waiting) {
     String queueName = waiting ? "waiting" : "running";
-    
+
     // check if the jobs in the queue are the right order
     JobInProgress[] jobs = getJobsInQueue(waiting);
-    assertTrue(queueName + " queue doesnt contain job #1 in right order", 
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue(queueName + " queue doesnt contain job #2 in right order", 
-                jobs[1].getJobID().equals(fjob2.getJobID()));
-    
+    assertTrue(
+      queueName + " queue doesnt contain job #1 in right order",
+      jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue(
+      queueName + " queue doesnt contain job #2 in right order",
+      jobs[1].getJobID().equals(fjob2.getJobID()));
+
     // I. Check the start-time change
     // Change job2 start-time and check if job2 bumps up in the queue 
     taskTrackerManager.setStartTime(fjob2, fjob1.startTime - 1);
-    
+
     jobs = getJobsInQueue(waiting);
-    assertTrue("Start time change didnt not work as expected for job #2 in "
-               + queueName + " queue", 
-                jobs[0].getJobID().equals(fjob2.getJobID()));
-    assertTrue("Start time change didnt not work as expected for job #1 in"
-               + queueName + " queue", 
-                jobs[1].getJobID().equals(fjob1.getJobID()));
-    
+    assertTrue(
+      "Start time change didnt not work as expected for job #2 in "
+        + queueName + " queue",
+      jobs[0].getJobID().equals(fjob2.getJobID()));
+    assertTrue(
+      "Start time change didnt not work as expected for job #1 in"
+        + queueName + " queue",
+      jobs[1].getJobID().equals(fjob1.getJobID()));
+
     // check if the queue is fine
-    assertEquals("Start-time change garbled the " + queueName + " queue", 
-                 2, jobs.length);
-    
+    assertEquals(
+      "Start-time change garbled the " + queueName + " queue",
+      2, jobs.length);
+
     // II. Change job priority change
     // Bump up job1's priority and make sure job1 bumps up in the queue
     taskTrackerManager.setPriority(fjob1, JobPriority.HIGH);
-    
+
     // Check if the priority changes are reflected
     jobs = getJobsInQueue(waiting);
-    assertTrue("Priority change didnt not work as expected for job #1 in "
-               + queueName + " queue",  
-                jobs[0].getJobID().equals(fjob1.getJobID()));
-    assertTrue("Priority change didnt not work as expected for job #2 in "
-               + queueName + " queue",  
-                jobs[1].getJobID().equals(fjob2.getJobID()));
-    
+    assertTrue(
+      "Priority change didnt not work as expected for job #1 in "
+        + queueName + " queue",
+      jobs[0].getJobID().equals(fjob1.getJobID()));
+    assertTrue(
+      "Priority change didnt not work as expected for job #2 in "
+        + queueName + " queue",
+      jobs[1].getJobID().equals(fjob2.getJobID()));
+
     // check if the queue is fine
-    assertEquals("Priority change has garbled the " + queueName + " queue", 
-                 2, jobs.length);
-    
+    assertEquals(
+      "Priority change has garbled the " + queueName + " queue",
+      2, jobs.length);
+
     // reset the queue state back to normal
     taskTrackerManager.setStartTime(fjob1, fjob2.startTime - 1);
     taskTrackerManager.setPriority(fjob1, JobPriority.NORMAL);
   }
-  
+
   private JobInProgress[] getJobsInQueue(boolean waiting) {
-    Collection<JobInProgress> queue = 
-      waiting 
-      ? scheduler.jobQueuesManager.getWaitingJobs("default")
-      : scheduler.jobQueuesManager.getRunningJobQueue("default");
+    Collection<JobInProgress> queue =
+      waiting
+        ? scheduler.jobQueuesManager.getJobQueue("default").getWaitingJobs()
+        : scheduler.jobQueuesManager.getJobQueue("default").getRunningJobs();
     return queue.toArray(new JobInProgress[0]);
   }
-  
+
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
-    taskTrackerManager.addQueues(new String[] {"default"});
-    
+    taskTrackerManager.addQueues(new String[]{"default"});
+
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // submit 2 jobs
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 3, 0, "default", "u1");
-    
+    FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 3, 0, "default", "u1");
+    FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, 3, 0, "default", "u1");
+
     // I. Check multiple assignments with running tasks within job
     // ask for a task from first job
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    Task t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000001_0 on tt1");
     //  ask for another task from the first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+
     // complete tasks
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-    
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
+
     // II. Check multiple assignments with running tasks across jobs
     // ask for a task from first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000003_0 on tt1");
+
     //  ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
+
     // complete tasks
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
-    
+    taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
+
     // III. Check multiple assignments with completed tasks across jobs
     // ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000002_0 on tt1");
+
     // complete task
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
-    
+    taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j2);
+
     // IV. Check assignment with completed job
     // finish first job
-    scheduler.jobCompleted(j1);
-    
+    scheduler.jobQueuesManager.getJobQueue(j1).jobCompleted(j1);
+
     // ask for another task from the second job
     // if tasks can be assigned then the structures are properly updated 
-    t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
-    
+    t = checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000003_0 on tt1");
+
     // complete task
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
   }
-  
-  // basic tests, should be able to submit to queues
-  public void testSubmitToQueues() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
 
-    // submit a job with no queue specified. It should be accepted
-    // and given to the default queue. 
-    JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    
-    // when we ask for a task, we should get one, from the job submitted
-    Task t;
-    t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // submit another job, to a different queue
-    j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // now when we get a task, it should be from the second job
-    t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-  }
-  
-  public void testGetJobs() throws Exception {
-    // need only one queue
-    String[] qs = { "default" };
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start();
-    HashMap<String, ArrayList<FakeJobInProgress>> subJobsList = 
-      submitJobs(1, 4, "default");
-   
+  /**
+   * tests the submission of jobs to container and job queues
+   * @throws Exception
+   */
+  public void testJobSubmission() throws Exception {
+    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
+
+    queues[0].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+    queues[1].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+    queues[2].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+
+    // write the configuration file
+    QueueManagerTestUtils.writeQueueConfigurationFile(
+        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    setUp(1, 4, 4);
+    // use the queues from the config file.
+    taskTrackerManager.setQueueManager(new QueueManager());
+    scheduler.start();
+
+    // submit a job to the container queue
+    try {
+      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0,
+          queues[0].getQueueName(), "user");
+      fail("Jobs are being able to be submitted to the container queue");
+    } catch (Exception e) {
+      assertTrue(scheduler.getJobs(queues[0].getQueueName()).isEmpty());
+    }
+
+    FakeJobInProgress job = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+        1, 0, queues[1].getQueueName(), "user");
+    assertEquals(1, scheduler.getJobs(queues[1].getQueueName()).size());
+    assertTrue(scheduler.getJobs(queues[1].getQueueName()).contains(job));
+
+    // check if the job is submitted
+    checkAssignment(taskTrackerManager, scheduler, "tt1", 
+    "attempt_test_0002_m_000001_0 on tt1");
+
+    // test for getJobs
+    HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
+      taskTrackerManager.submitJobs(1, 4, queues[2].getQueueName());
+
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-    
-    while(mgr.getWaitingJobs("default").size() < 4){
-      Thread.sleep(1);
-    }
     //Raise status change events for jobs submitted.
-    raiseStatusChangeEvents(mgr);
-    Collection<JobInProgress> jobs = scheduler.getJobs("default");
-    
-    assertTrue("Number of jobs returned by scheduler is wrong" 
-        ,jobs.size() == 4);
-    
-    assertTrue("Submitted jobs and Returned jobs are not same",
-        subJobsList.get("u1").containsAll(jobs));
+    raiseStatusChangeEvents(mgr, queues[2].getQueueName());
+    Collection<JobInProgress> jobs =
+      scheduler.getJobs(queues[2].getQueueName());
+
+    assertTrue(
+      "Number of jobs returned by scheduler is wrong"
+      , jobs.size() == 4);
+
+    assertTrue(
+      "Submitted jobs and Returned jobs are not same",
+      subJobsList.get("u1").containsAll(jobs));
   }
-  
+
   //Basic test to test capacity allocation across the queues which have no
   //capacity configured.
-  
+
   public void testCapacityAllocationToQueues() throws Exception {
-    String[] qs = {"default","q1","q2","q3","q4"};
+    String[] qs = {"default", "qAZ1", "qAZ2", "qAZ3", "qAZ4"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ1", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ2", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ3", -1.0f, true, 25));
+    queues.add(new FakeQueueInfo("qAZ4", -1.0f, true, 25));
+
+
+    taskTrackerManager.setFakeQueues(queues);
+    scheduler.start();
+    JobQueuesManager jqm = scheduler.jobQueuesManager;
+    assertEquals(18.75f, jqm.getJobQueue("qAZ1").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ2").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ3").qsc.getCapacityPercent());
+    assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
+  }
+
+  public void testCapacityAllocFailureWithLowerMaxCapacity() throws Exception {
+    String[] qs = {"default", "qAZ1"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default",25.0f,true,25));
-    queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start(); 
-    assertEquals(18.75f, resConf.getCapacity("q1"));
-    assertEquals(18.75f, resConf.getCapacity("q2"));
-    assertEquals(18.75f, resConf.getCapacity("q3"));
-    assertEquals(18.75f, resConf.getCapacity("q4"));
+    queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+    FakeQueueInfo qi = new FakeQueueInfo("qAZ1", -1.0f, true, 25);
+    qi.maxCapacity = 40.0f;
+    queues.add(qi);
+    taskTrackerManager.setFakeQueues(queues);
+    try {
+      scheduler.start();
+      fail("scheduler start should fail ");
+    }catch(IOException ise) {
+      Throwable e = ise.getCause();
+      assertTrue(e instanceof IllegalStateException);
+      assertEquals(
+        e.getMessage(),
+        " Capacity share (" + 75.0f + ")for unconfigured queue " + "qAZ1" +
+          " is greater than its maximum-capacity percentage " + 40.0f);
+    }
   }
 
   // Tests how capacity is computed and assignment of tasks done
@@ -1193,50 +462,51 @@
     // the cluster capacity increase slowly.
     queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
-   
+
     // submit a job to the default queue
-    submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
-    
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "default", "u1");
+
     // submit a job to the second queue
-    submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
-    
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
+
     // job from q2 runs first because it has some non-zero capacity.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    verifyCapacity("0", "default");
-    verifyCapacity("3", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000001_0 on tt1");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "3", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt3");
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
-    verifyCapacity("0", "default");
-    verifyCapacity("5", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000002_0 on tt2");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "5", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt4");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
-    verifyCapacity("0", "default");
-    verifyCapacity("7", "q2");
-    
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt3",
+      "attempt_test_0002_m_000003_0 on tt3");
+    verifyCapacity(taskTrackerManager, "0", "default");
+    verifyCapacity(taskTrackerManager, "7", "q2");
+
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
     // in terms of runningMaps / capacity.
-    checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
-    verifyCapacity("1", "default");
-    verifyCapacity("9", "q2");
-  }
-  
-  private void verifyCapacity(String expectedCapacity,
-                                          String queue) throws IOException {
-    String schedInfo = taskTrackerManager.getQueueManager().
-                          getSchedulerInfo(queue).toString();    
-    assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
-        + expectedCapacity + " slots"));
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0001_m_000001_0 on tt4");
+    verifyCapacity(taskTrackerManager, "1", "default");
+    verifyCapacity(taskTrackerManager, "9", "q2");
   }
-  
+
   // test capacity transfer
   public void testCapacityTransfer() throws Exception {
     // set up some queues
@@ -1245,159 +515,202 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // submit a job  
-    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // I should get another map task. 
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // we should get a task
+    Map<String,String> expectedStrings = new HashMap<String,String>();
+    expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      expectedStrings);
+
+    // I should get another map task.
+    //No redduces as there is 1 slot only for reduce on TT
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+
     // Now we're at full capacity for maps. If I ask for another map task,
-    // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    // I should get a map task from the default queue's capacity.
+    //same with reduce
+    expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      expectedStrings);
+    
     // and another
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000004_0 on tt2");
   }
 
   /**
-   * Creates a queue with max task limit of 2
-   * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
-   * given to high ram job and are reserved , no other tasks are accepted .
-   *
+   * test the high memory blocking with max capacity.
    * @throws IOException
    */
-  public void testHighMemoryBlockingWithMaxLimit()
-      throws IOException {
-
-    // 2 map and 1 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+  public void testHighMemoryBlockingWithMaxCapacity()
+    throws IOException {
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
 
-    taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+    taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("defaultXYZ",2);
+    queues.add(new FakeQueueInfo("defaultXYZM", 25.0f, true, 50));
+
+
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
-    scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
-        2 * 1024);
-    scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
-    scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
-        1 * 1024);
-    scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
-    scheduler.setResourceManagerConf(resConf);
+    scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+    scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
+    scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+    scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .setMaxCapacityPercent(50);
 
-    // The situation :  Submit 2 jobs with high memory map task
-    //Set the max limit for queue to 2 ,
-    // try submitting more map tasks to the queue , it should not happen
-
-    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
-        + "2 map tasks");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
-    jConf.setMemoryForReduceTask(0);
+    jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(2);
-    jConf.setNumReduceTasks(0);
-    jConf.setQueueName("defaultXYZ");
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
-    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
-        + "2 map/red tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
-    jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(2);
+    jConf.setMemoryForReduceTask(2 * 1024);
+    jConf.setNumMapTasks(1);
     jConf.setNumReduceTasks(2);
-    jConf.setQueueName("defaultXYZ");
+    jConf.setQueueName("defaultXYZM");
     jConf.setUser("u1");
-    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // first, a map from j1 will run this is a high memory job so it would
-    // occupy the 2 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
-    // at this point, the scheduler tries to schedule another map from j1.
-    // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    
-    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+  //high ram map from job 1 and normal reduce task from job 1
+    HashMap<String,String> expectedStrings = new HashMap<String,String>();
+    expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+    List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt1", expectedStrings);
+
+    checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+    checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 1, 100.0f,0,2);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
 
-    //at this point , the scheduler tries to schedule another map from j2 for
-    //another task tracker.
-    // This should not happen as all the map slots are taken
-    //by the first task itself.hence reduce task from the second job is given
+    //we have reached the maximum limit for map, so no more map tasks.
+    //we have used 1 reduce already and 1 more reduce slot is left for the
+    //before we reach maxcapacity for reduces.
+    // But current 1 slot + 2 slots for high ram reduce would
+    //mean we are crossing the maxium capacity.hence nothing would be assigned
+    //in this call
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+
+    //complete the high ram job on tt1.
+    for (Task task : tasks) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(),
+        job1);
+    }
+
+    expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt2");
+
+    tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt2", expectedStrings);
+
+    checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+    checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+    //complete the high ram job on tt1.
+    for (Task task : tasks) {
+      taskTrackerManager.finishTask(
+        task.getTaskID().toString(),
+        job2);
+    }
 
-    checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+
+    expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt2");
+    expectedStrings.put(REDUCE,"attempt_test_0002_r_000002_0 on tt2");
+
+    tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt2", expectedStrings);
   }
 
   /**
-   *   test if user limits automatically adjust to max map or reduce limit
+   * test if user limits automatically adjust to max map or reduce limit
    */
-  public void testUserLimitsWithMaxLimits() throws Exception {
-    setUp(4, 4, 4);
+  public void testUserLimitsWithMaxCapacity() throws Exception {
+    setUp(2, 2, 2);
     // set up some queues
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
-    resConf.setFakeQueues(queues);
-    resConf.setMaxMapCap("default", 2);
-    resConf.setMaxReduceCap("default", 2);
-    scheduler.setResourceManagerConf(resConf);
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
+    scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+      .setMaxCapacityPercent(75);
 
     // submit a job
     FakeJobInProgress fjob1 =
-      submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
     FakeJobInProgress fjob2 =
-      submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
 
-    // for queue 'default', the capacity for maps is 2.
-    // But the max map limit is 2
-    // hence user should be getting not more than 1 as it is the 50%.
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-
-    //Now we should get the task from the other job. As the
-    //first user has reached his max map limit.
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-
-    //Now we are done with map limit , now if we ask for task we should
-    // get reduce from 1st job
-    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
-    // Now we're at full capacity for maps. 1 done with reduces for job 1 so
-    // now we should get 1 reduces for job 2
-    Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+    // for queue 'default', maxCapacity for map and reduce is 3.
+    // initial user limit for 50% assuming there are 2 users/queue is.
+    //  1 map and 1 reduce.
+    // after max capacity it is 1.5 each.
 
-    taskTrackerManager.finishTask(
-      "tt1", t1.getTaskID().toString(),
-      fjob1);
+    //first job would be given 1 job each.
+    HashMap<String,String> expectedStrings = new HashMap<String,String>();
+    expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
 
-    //tt1 completed the task so we have 1 map slot for u1
-    // we are assigning the 2nd map task from fjob1
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt1", expectedStrings);
 
-    taskTrackerManager.finishTask(
-      "tt4", t4.getTaskID().toString(),
-      fjob2);
-    //tt4 completed the task , so we have 1 reduce slot for u2
-    //we are assigning the 2nd reduce from fjob2
-    checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
 
+    //for user u1 we have reached the limit. that is 1 job.
+    //1 more map and reduce tasks.
+    expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt1");
+    expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+    tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt1", expectedStrings);
+
+    expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+    expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+
+    tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+      "tt2", expectedStrings);
+
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+  }
+
+  // Utility method to construct a map of expected strings
+  // with exactly one map task and one reduce task.
+  private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
+                        String mapTask, String reduceTask) {
+    expectedTaskStrings.clear();
+    expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
+    expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
   }
 
 
@@ -1409,24 +722,38 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // submit a job  
-    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the capacity is 2 for maps and 1 for reduce. 
+    // Since we're the only user, we should get tasks
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+              "attempt_test_0001_m_000001_0 on tt1", 
+              "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+                                  "tt1", expectedTaskStrings);
+
     // Submit another job, from a different user
-    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Now we're at full capacity for maps. If I ask for another map task,
-    // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+    // Now if I ask for a task, it should come from the second job
+    checkAssignment(taskTrackerManager, scheduler, 
+        "tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+    // Now we're at full capacity. If I ask for another task,
+    // I should get tasks from the default queue's capacity.
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000002_0 on tt2", 
+        "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+          "tt2", expectedTaskStrings);
     // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment(taskTrackerManager, scheduler, 
+            "tt2", "attempt_test_0002_m_000002_0 on tt2");
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1437,23 +764,39 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // submit a job  
-    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the capacity for maps is 2 and reduce is 1. 
+    // Since we're the only user, we should get tasks
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt1", expectedTaskStrings);
+
     // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+
     // Submit another job, from a different user
-    submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+    // Now if I ask for a task, it should come from the second job
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_r_000001_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt2", expectedTaskStrings);
     // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0002_m_000002_0 on tt2");
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1465,46 +808,79 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // submit a job  
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
+    // for queue 'q2', the capacity for maps is 2 and reduces is 1. 
+    // Since we're the only user, we should get a task
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt1", expectedTaskStrings);
     // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    // we get two more maps from 'default queue'
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0001_m_000002_0 on tt1");
+    // we get more tasks from 'default queue'
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0001_m_000003_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt2", expectedTaskStrings);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000004_0 on tt2");
+
     // Submit another job, from a different user
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // one of the task finishes
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+    // one of the task finishes of each type
+    taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
+    
+    // Now if I ask for a task, it should come from the second job
+    populateExpectedStrings(expectedTaskStrings, 
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0002_r_000001_0 on tt1");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt1", expectedTaskStrings);
+
     // another task from job1 finishes, another new task to job2
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000002_0 on tt1");
+
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
-    checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+    taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
+    populateExpectedStrings(expectedTaskStrings,
+        "attempt_test_0001_m_000005_0 on tt2",
+        "attempt_test_0001_r_000003_0 on tt2");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+        "tt2", expectedTaskStrings);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt1",
+      "attempt_test_0002_m_000003_0 on tt1");
   }
 
   // test user limits with many users, more slots
   public void testUserLimits4() throws Exception {
-    // set up one queue, with 10 slots
+    // set up one queue, with 10 map slots and 5 reduce slots
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
     // add some more TTs 
     taskTrackerManager.addTaskTracker("tt3");
@@ -1512,74 +888,93 @@
     taskTrackerManager.addTaskTracker("tt5");
 
     // u1 submits job
-    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // it gets the first 5 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+    for (int i=0; i<5; i++) {
+      String ttName = "tt"+(i+1);
+      populateExpectedStrings(expectedTaskStrings,
+          "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName, 
+          "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
+      checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+          ttName, expectedTaskStrings);
+    }
+      
     // u2 submits job with 4 slots
-    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
+    FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
     // u2 should get next 4 slots
-    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
-    checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
-    checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
+    for (int i=0; i<4; i++) {
+      String ttName = "tt"+(i+1);
+      checkAssignment(taskTrackerManager, scheduler, ttName,
+          "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
+    }
     // last slot should go to u1, since u2 has no more tasks
-    checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
-    // u1 finishes a task
-    taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt5",
+      "attempt_test_0001_m_000006_0 on tt5");
+    // u1 finishes tasks
+    taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
     // u1 submits a few more jobs 
     // All the jobs are inited when submitted
     // because of addition of Eager Job Initializer all jobs in this
     //case would e initialised.
-    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     // u2 also submits a job
-    submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
     // now u3 submits a job
-    submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
-    // next slot should go to u3, even though u2 has an earlier job, since
+    taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
+    // next map slot should go to u3, even though u2 has an earlier job, since
     // user limits have changed and u1/u2 are over limits
-    checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+    // reduce slot will go to job 2, as it is still under limit.
+    populateExpectedStrings(expectedTaskStrings,
+        "attempt_test_0007_m_000001_0 on tt5",
+        "attempt_test_0002_r_000001_0 on tt5");
+    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
+        "tt5", expectedTaskStrings);
     // some other task finishes and u3 gets it
-    taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
-    checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+    taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt4",
+      "attempt_test_0007_m_000002_0 on tt4");
     // now, u2 finishes a task
-    taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
+    taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
     // next slot will go to u1, since u3 has nothing to run and u1's job is 
     // first in the queue
-    checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
+    checkAssignment(
+      taskTrackerManager, scheduler, "tt2",
+      "attempt_test_0001_m_000007_0 on tt2");
   }
 
   /**
    * Test to verify that high memory jobs hit user limits faster than any normal
    * job.
-   * 
+   *
    * @throws IOException
    */
   public void testUserLimitsForHighMemoryJobs()
-      throws IOException {
+    throws IOException {
     taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
     scheduler.setTaskTrackerManager(taskTrackerManager);
-    String[] qs = { "default" };
+    String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
-    resConf.setFakeQueues(queues);
+
+
     // enabled memory-based scheduling
     // Normal job in the cluster would be 1GB maps/reduces
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY, 2 * 1024);
+      JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+      MRConfig.MAPMEMORY_MB, 1 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY, 2 * 1024);
+      JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
     scheduler.getConf().setLong(
-        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
-    scheduler.setResourceManagerConf(resConf);
+      MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     // Submit one normal job to the other queue.
@@ -1590,9 +985,11 @@
     jConf.setNumReduceTasks(6);
     jConf.setUser("u1");
     jConf.setQueueName("default");
-    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    LOG.debug("Submit one high memory(2GB maps, 2GB reduces) job of "
+    LOG.debug(
+      "Submit one high memory(2GB maps, 2GB reduces) job of "
         + "6 map and 6 reduce tasks");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
@@ -1601,32 +998,41 @@
     jConf.setNumReduceTasks(6);
     jConf.setQueueName("default");
     jConf.setUser("u2");
-    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+      JobStatus.PREP, jConf);
 
-    // Verify that normal job takes 3 task assignments to hit user limits
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
+    // Verify that normal job takes 5 task assignments to hit user limits
+    Map<String, String> expectedStrings = new HashMap<String, String>();
+    for (int i = 0; i < 5; i++) {
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP,
+        "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE,
+        "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+    }
     // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
     // are hit. So u2 should get slots
 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
-
-    // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+    for (int i = 0; i < 2; i++) {
+      expectedStrings.clear();
+      expectedStrings.put(
+        CapacityTestUtils.MAP,
+        "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
+      expectedStrings.put(
+        CapacityTestUtils.REDUCE,
+        "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
+      checkMultipleTaskAssignment(
+        taskTrackerManager, scheduler, "tt1",
+        expectedStrings);
+    }  // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
     // slots. Because of high memory tasks, giving u2 another task would
     // overflow limits. So, no more tasks should be given to anyone.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
   }
 
   /*
@@ -1639,9 +1045,7 @@
    * - Then run initializationPoller()
    * - Check once again the waiting queue, it should be 5 jobs again.
    * - Then raise status change events.
-   * - Assign one task to a task tracker. (Map)
-   * - Check waiting job count, it should be 4 now and used map (%) = 100
-   * - Assign another one task (Reduce)
+   * - Assign tasks to a task tracker.
    * - Check waiting job count, it should be 4 now and used map (%) = 100
    * and used reduce (%) = 100
    * - finish the job and then check the used percentage it should go
@@ -1654,9 +1058,9 @@
    * - Check the count, the waiting job count should be 2.
    * - Now raise status change events to move the initialized jobs which
    * should be two in count to running queue.
-   * - Then schedule a map of the job in running queue.
+   * - Then schedule a map and reduce of the job in running queue.
    * - Run the poller because the poller is responsible for waiting
-   * jobs count. Check the count, it should be using 100% map and one
+   * jobs count. Check the count, it should be using 100% map, reduce and one
    * waiting job
    * - fail the running job.
    * - Check the count, it should be now one waiting job and zero running
@@ -1671,20 +1075,22 @@
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
+
+
+    taskTrackerManager.setFakeQueues(queues);
     scheduler.start();
 
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat

[... 2198 lines stripped ...]