You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:30:34 UTC

svn commit: r1077573 [3/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 04:30:34 2011
@@ -170,6 +170,9 @@ public class TestCapacityScheduler exten
         FakeTaskTrackerManager taskTrackerManager, String user, 
         JobTracker jt) {
       super(jId, jobConf, jt);
+      if (user == null) {
+        user = "drwho";
+      }
       this.taskTrackerManager = taskTrackerManager;
       this.startTime = System.currentTimeMillis();
       this.status.setJobPriority(JobPriority.NORMAL);
@@ -198,6 +201,12 @@ public class TestCapacityScheduler exten
     }
     
     @Override
+    public Task obtainNewNonLocalMapTask(final TaskTrackerStatus tts, 
+        int clusterSize, int ignored) throws IOException {
+      return obtainNewMapTask(tts, clusterSize, ignored);
+    }
+    
+    @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
@@ -436,7 +445,7 @@ public class TestCapacityScheduler exten
     int reduces = 0;
     int maxMapTasksPerTracker = 2;
     int maxReduceTasksPerTracker = 1;
-    List<JobInProgressListener> listeners =
+    List<JobInProgressListener> mylisteners =
       new ArrayList<JobInProgressListener>();
     FakeQueueManager qm = new FakeQueueManager();
     
@@ -511,7 +520,7 @@ public class TestCapacityScheduler exten
         JobStatus newStatus = (JobStatus)jip.getStatus().clone();
         JobStatusChangeEvent event = new JobStatusChangeEvent(jip, 
             EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
-        for (JobInProgressListener listener : listeners) {
+        for (JobInProgressListener listener : mylisteners) {
           listener.jobUpdated(event);
         }
       } catch (Exception ioe) {
@@ -542,16 +551,16 @@ public class TestCapacityScheduler exten
 
 
     public void addJobInProgressListener(JobInProgressListener listener) {
-      listeners.add(listener);
+      mylisteners.add(listener);
     }
 
     public void removeJobInProgressListener(JobInProgressListener listener) {
-      listeners.remove(listener);
+      mylisteners.remove(listener);
     }
     
     public void submitJob(JobInProgress job) throws IOException {
       jobs.put(job.getJobID(), job);
-      for (JobInProgressListener listener : listeners) {
+      for (JobInProgressListener listener : mylisteners) {
         listener.jobAdded(job);
       }
     }
@@ -612,7 +621,7 @@ public class TestCapacityScheduler exten
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus, 
                                   newStatus);
-      for (JobInProgressListener listener : listeners) {
+      for (JobInProgressListener listener : mylisteners) {
         listener.jobUpdated(event);
       }
     }
@@ -625,7 +634,7 @@ public class TestCapacityScheduler exten
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus, 
                                   newStatus);
-      for (JobInProgressListener listener : listeners) {
+      for (JobInProgressListener listener : mylisteners) {
         listener.jobUpdated(event);
       }
     }
@@ -642,7 +651,7 @@ public class TestCapacityScheduler exten
       JobStatusChangeEvent event = 
         new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
                                   newStatus);
-      for (JobInProgressListener listener : listeners) {
+      for (JobInProgressListener listener : mylisteners) {
         listener.jobUpdated(event);
       }
     }
@@ -849,9 +858,9 @@ public class TestCapacityScheduler exten
     taskTrackerManager.initJob(fjob1);
     
     // check if the jobs are missing from the waiting queue
-    // The jobs are not removed from waiting queue until they are scheduled 
+    // The jobs are not removed from waiting queue until they are scheduled
     assertEquals("Waiting queue is garbled on job init", 2, 
-                 scheduler.jobQueuesManager.getWaitingJobs("default")
+                 scheduler.jobQueuesManager.getQueue("default").getWaitingJobs()
                           .size());
     
     // test if changing the job priority/start-time works as expected in the 
@@ -868,8 +877,9 @@ public class TestCapacityScheduler exten
     // mark the job as complete
     taskTrackerManager.finalizeJob(fjob1);
     
-    Collection<JobInProgress> rqueue = 
-      scheduler.jobQueuesManager.getRunningJobQueue("default");
+    CapacitySchedulerQueue queue = 
+      scheduler.jobQueuesManager.getQueue("default"); 
+    Collection<JobInProgress> rqueue = queue.getRunningJobs();
     
     // check if the job is removed from the scheduler
     assertFalse("Scheduler contains completed job", 
@@ -886,13 +896,17 @@ public class TestCapacityScheduler exten
    * @throws IOException
    */
   public void testMaxCapacities() throws IOException {
+    System.err.println("testMaxCapacities");
     this.setUp(4,1,1);
-    taskTrackerManager.addQueues(new String[] {"default"});
+    taskTrackerManager.addQueues(new String[] {"default", "q2"});
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
-
+    queues.add(new FakeQueueInfo("q2", 75.0f, false, 1));
+    
     resConf.setFakeQueues(queues);
     resConf.setMaxCapacity("default", 50.0f);
+    resConf.setUserLimitFactor("default", 2);
+    
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -904,22 +918,22 @@ public class TestCapacityScheduler exten
 
     //first call of assign task should give task from default queue.
     //default uses 1 map and 1 reduce slots are used
-    checkMultipleAssignment(
-      "tt1", "attempt_test_0001_m_000001_0 on tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
 
     //second call of assign task
     //default uses 2 map and 2 reduce slots
-    checkMultipleAssignment(
-      "tt2", "attempt_test_0001_m_000002_0 on tt2",
-      "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0001_m_000002_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2"});
 
 
     //Now we have reached the max capacity limit for default ,
     //no further tasks would be assigned to this queue.
-    checkMultipleAssignment(
-      "tt3", null,
-      null);
+    checkAssignments("tt3", new String[] {});
   }
   
   // test if the queue reflects the changes
@@ -976,18 +990,19 @@ public class TestCapacityScheduler exten
   private JobInProgress[] getJobsInQueue(boolean waiting) {
     Collection<JobInProgress> queue = 
       waiting 
-      ? scheduler.jobQueuesManager.getWaitingJobs("default")
-      : scheduler.jobQueuesManager.getRunningJobQueue("default");
+      ? scheduler.jobQueuesManager.getQueue("default").getWaitingJobs()
+      : scheduler.jobQueuesManager.getQueue("default").getRunningJobs();
     return queue.toArray(new JobInProgress[0]);
   }
   
   // tests if tasks can be assinged when there are multiple jobs from a same
   // user
   public void testJobFinished() throws Exception {
-    taskTrackerManager.addQueues(new String[] {"default"});
+    taskTrackerManager.addQueues(new String[] {"default", "q2"});
     
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1083,6 +1098,8 @@ public class TestCapacityScheduler exten
       queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
       queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
       resConf.setFakeQueues(queues);
+      resConf.setUserLimitFactor("q1", 4);
+      resConf.setUserLimitFactor("q2", 4);
       scheduler.setResourceManagerConf(resConf);
       scheduler.start();
 
@@ -1159,8 +1176,8 @@ public class TestCapacityScheduler exten
       submitJobs(1, 4, "default");
    
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-    
-    while(mgr.getWaitingJobs("default").size() < 4){
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
+    while(queue.getWaitingJobs().size() < 4){
       Thread.sleep(1);
     }
     //Raise status change events for jobs submitted.
@@ -1174,46 +1191,24 @@ public class TestCapacityScheduler exten
         subJobsList.get("u1").containsAll(jobs));
   }
   
-  //Basic test to test capacity allocation across the queues which have no
-  //capacity configured.
-  
-  public void testCapacityAllocationToQueues() throws Exception {
-    String[] qs = {"default","q1","q2","q3","q4"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default",25.0f,true,25));
-    queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
-    queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
-    resConf.setFakeQueues(queues);
-    scheduler.setResourceManagerConf(resConf);
-    scheduler.start(); 
-    assertEquals(18.75f, resConf.getCapacity("q1"));
-    assertEquals(18.75f, resConf.getCapacity("q2"));
-    assertEquals(18.75f, resConf.getCapacity("q3"));
-    assertEquals(18.75f, resConf.getCapacity("q4"));
-  }
-
   public void testCapacityAllocFailureWithLowerMaxCapacity()
     throws Exception {
     String[] qs = {"default", "q1"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
-    queues.add(new FakeQueueInfo("q1", -1.0f, true, 50));
+    queues.add(new FakeQueueInfo("q1", 50.0f, true, 50));
     resConf.setFakeQueues(queues);
     resConf.setMaxCapacity("q1", 40.0f);
     scheduler.setResourceManagerConf(resConf);
+    boolean failed = false;
     try {
       scheduler.start();
       fail("Scheduler start should fail ");
-    } catch (IllegalStateException ise) {
-      assertEquals(
-        ise.getMessage(),
-        " Allocated capacity of " + 50.0f + " to unconfigured queue " +
-          "q1" + " is greater than maximum Capacity " + 40.0f);
+    } catch (IllegalArgumentException iae) {
+      failed = true;  
     }
+    assertTrue("Scheduler start didn't fail!", failed);
   }
 
   // Tests how capacity is computed and assignment of tasks done
@@ -1287,6 +1282,8 @@ public class TestCapacityScheduler exten
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
+    resConf.setUserLimitFactor("default", 4);
+    resConf.setUserLimitFactor("q2", 4);
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1323,10 +1320,12 @@ public class TestCapacityScheduler exten
     taskTrackerManager = 
       new FakeTaskTrackerManager(2, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
 
-    taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+    taskTrackerManager.addQueues(new String[] { "defaultXYZ", "q2" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("defaultXYZ", 25.0f, true, 50));
+    queues.add(new FakeQueueInfo("q2", 75.0f, true, 50));
     resConf.setFakeQueues(queues);
+    resConf.setUserLimitFactor("defaultXYZ", 2);
 
     //defaultXYZ can go up to 2 map and 2 reduce slots
     resConf.setMaxCapacity("defaultXYZ", 50.0f);
@@ -1418,10 +1417,11 @@ public class TestCapacityScheduler exten
   public void testUserLimitsWithMaxCapacities() throws Exception {
     setUp(2, 2, 2);
     // set up some queues
-    String[] qs = {"default"};
+    String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 50));
     resConf.setFakeQueues(queues);
     resConf.setMaxCapacity("default", 75.0f);
     scheduler.setResourceManagerConf(resConf);
@@ -1520,6 +1520,8 @@ public class TestCapacityScheduler exten
   // test user limits when a 2nd job is submitted much after first job 
   // and we need to wait for first job's task to complete
   public void testUserLimits3() throws Exception {
+    System.err.println("testUserLimits3");
+    
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
@@ -1535,39 +1537,37 @@ public class TestCapacityScheduler exten
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
     checkAssignments("tt1", 
-        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1", 
         "attempt_test_0001_m_000002_0 on tt1", 
         "attempt_test_0001_r_000001_0 on tt1"});
 
-    // we get two more maps from 'default queue'
-    checkAssignments("tt2", 
-        new String[] {"attempt_test_0001_m_000003_0 on tt2", 
-        "attempt_test_0001_m_000004_0 on tt2", 
-        "attempt_test_0001_r_000002_0 on tt2"});
+    // No tasks assigned since u1 has hit user limits of 50% i.e. q2 capacity
+    checkAssignments("tt2", new String[] {});
 
     // Submit another job, from a different user
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     
-    // one of the task finishes
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
-    
     // Now if I ask for a map task, it should come from the second job 
     // and reduce from job2
-    checkAssignments("tt1", 
-        new String[] {"attempt_test_0002_m_000001_0 on tt1"});
-    
-    // another task from job1 finishes, another new map and reduce task to job2
-    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_m_000002_0 on tt2",
+        "attempt_test_0002_r_000001_0 on tt2",
+        });
     
-    // job2 shud get the map slot & reduce
-    checkAssignments("tt1", new String[] {"attempt_test_0002_m_000002_0 on tt1"});
+    // A task from job1 finishes
+    // job1 shud get the map slot since u1 has only 1 task running 
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    checkAssignments("tt1", new String[] {"attempt_test_0001_m_000003_0 on tt1"});
     
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
-    checkAssignments("tt2", new String[] {"attempt_test_0001_m_000005_0 on tt2"});
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
-    checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+    checkAssignments("tt2", new String[] {"attempt_test_0002_m_000003_0 on tt2"});
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
   }
 
   // test user limits with many users, more slots
@@ -1764,6 +1764,8 @@ public class TestCapacityScheduler exten
     queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
     resConf.setFakeQueues(queues);
+    resConf.setMaxInitializedActiveTasksPerUser("default", 4);  // 4 tasks max
+    resConf.setMaxInitializedActiveTasksPerUser("q2", 4);  // 4 tasks max
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
@@ -1847,11 +1849,10 @@ public class TestCapacityScheduler exten
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    System.err.println(schedulingInfo);
     assertEquals(schedulingInfo, 22, infoStrings.length);
     assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
     assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
@@ -1868,7 +1869,7 @@ public class TestCapacityScheduler exten
     taskTrackerManager.finalizeJob(u1j1);
 
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1887,7 +1888,7 @@ public class TestCapacityScheduler exten
     //Run initializer to clean up failed jobs
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1909,7 +1910,7 @@ public class TestCapacityScheduler exten
     //run initializer to clean up failed job
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1942,7 +1943,7 @@ public class TestCapacityScheduler exten
     //one. run the poller as it is responsible for waiting count
     controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
@@ -1958,7 +1959,7 @@ public class TestCapacityScheduler exten
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
     // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     //Now running counts should become zero
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -2213,12 +2214,15 @@ public class TestCapacityScheduler exten
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
     
-    // Reservations are already done for job2. So job3 should go ahead.
+    // Reservations are already done for job2. 
+    // So job3 should go ahead. 
+    // However, it has hit the user limit of 6 for reduces (incl. the reserved
+    // slot), so we should only get a map.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+    scheduler.updateQueueUsageForTests();
     checkAssignments("tt1", 
         new String[] {
-        "attempt_test_0003_m_000001_0 on tt1", 
-        "attempt_test_0003_r_000001_0 on tt1"});
+        "attempt_test_0003_m_000001_0 on tt1"});
   }
 
   /**
@@ -2332,23 +2336,25 @@ public class TestCapacityScheduler exten
 
   /*
    * Test cases for Job Initialization poller.
-   */
-  
-  /*
+   * 
    * This test verifies that the correct number of jobs for
    * correct number of users is initialized.
    * It also verifies that as jobs of users complete, new jobs
    * from the correct users are initialized.
    */
   public void testJobInitialization() throws Exception {
+    System.err.println("testJobInitialization");
     // set up the scheduler
     String[] qs = { "default" };
     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
+    controlledInitializationPoller.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
+    resConf.setMaxSystemJobs(6);  // 6 jobs max
+    resConf.setMaxInitializedActiveTasksPerUser("default", 4);  // 4 tasks max
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
   
@@ -2358,7 +2364,7 @@ public class TestCapacityScheduler exten
     // submit 4 jobs each for 3 users.
     HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
         4, "default");
-
+        
     // get the jobs submitted.
     ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
     ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
@@ -2369,13 +2375,15 @@ public class TestCapacityScheduler exten
     Set<JobID> initializedJobs = initPoller.getInitializedJobList();
     
     // we should have 12 (3 x 4) jobs in the job queue
-    assertEquals(mgr.getWaitingJobs("default").size(), 12);
-
+    assertEquals(mgr.getQueue("default").getWaitingJobs().size(), 12);
+        
     // run one poller iteration.
     controlledInitializationPoller.selectJobsToInitialize();
     
+    System.err.println("3 TTM #listeners=" + taskTrackerManager.mylisteners.size());
+    
     // the poller should initialize 6 jobs
-    // 3 users and 2 jobs from each
+    // 3 users and 2 jobs (with 2 tasks) from each
     assertEquals(initializedJobs.size(), 6);
 
     assertTrue("Initialized jobs didnt contain the user1 job 1",
@@ -2401,7 +2409,7 @@ public class TestCapacityScheduler exten
     // since no jobs have started running, there should be no
     // change to the initialized jobs.
     assertEquals(initializedJobs.size(), 6);
-    assertFalse("Initialized jobs contains user 4 jobs",
+    assertFalse("Initialized jobs doesn't contain user 4 jobs",
         initializedJobs.contains(u4j1.getJobID()));
     
     // This event simulates raising the event on completion of setup task
@@ -2422,15 +2430,28 @@ public class TestCapacityScheduler exten
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1Jobs.get(0));
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", u1Jobs.get(0));
     taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", u1Jobs.get(1));
+    
+    // as max running jobs is 6, still no more jobs can be inited
+    controlledInitializationPoller.selectJobsToInitialize();
 
-    // as some jobs have running tasks, the poller will now
+    // count should be 4 since 2 jobs are running
+    assertEquals(initializedJobs.size(), 4);
+    
+    // Job has completed
+    taskTrackerManager.finalizeJob(u1Jobs.get(0));
+    taskTrackerManager.finalizeJob(u1Jobs.get(1));
+    
+    // count should now be 4 since we haven't called the poller yet
+    assertEquals(initializedJobs.size(), 4);
+
+    // as some jobs have completed, the poller will now
     // pick up new jobs to initialize.
     controlledInitializationPoller.selectJobsToInitialize();
 
     // count should still be the same
     assertEquals(initializedJobs.size(), 6);
-    
+
     // new jobs that have got into the list
     assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
     assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
@@ -2451,8 +2472,8 @@ public class TestCapacityScheduler exten
     taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", u1Jobs.get(2));
     taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_0", u1Jobs.get(2));
 
-    // no new jobs should be picked up, because max user limit
-    // is still 3.
+    // no new jobs should be picked up, because max running jobs is 6, job3 
+    // hasn't been marked as 'complete' yet
     controlledInitializationPoller.selectJobsToInitialize();
     
     assertEquals(initializedJobs.size(), 5);
@@ -2464,16 +2485,109 @@ public class TestCapacityScheduler exten
         "attempt_test_0004_r_000001_0 on tt1"
     });
     
+    // Finish job_0003
+    taskTrackerManager.finalizeJob(u1Jobs.get(2));
+    
     // Now initialised jobs should contain user 4's job, as
-    // user 1's jobs are all done and the number of users is
-    // below the limit
+    // user 1's jobs are all done u2 and u3 already have 6 active tasks
     controlledInitializationPoller.selectJobsToInitialize();
     assertEquals(initializedJobs.size(), 5);
     assertTrue(initializedJobs.contains(u4j1.getJobID()));
     
     controlledInitializationPoller.stopRunning();
   }
+  
+  /**
+   * This testcase test limits on job-submission per-user and per-queue.
+   */
+  public void testJobSubmissionLimits() throws Exception {
+    System.err.println("testJobSubmissionLimits");
+    
+    // set up the scheduler
+    String[] qs = {"default", "q2"};
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxInitializedActiveTasksPerUser("default", 4);  // 4 tasks max
+    resConf.setMaxInitializedActiveTasksPerUser("q2", 4);  // 4 tasks max
+    resConf.setInitToAcceptJobsFactor("default", 1);
+    resConf.setMaxSystemJobs(12); // max 12 running jobs in the system, hence
+
+    // In queue 'default'
+    // max (pending+running) jobs -> 12 * 1 * .5 = 6 
+    // max jobs per user to init -> 12 * .5 * .5 = 2
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+  
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+
+    // submit 2 jobs each for 3 users, the maximum possible to default
+    HashMap<String, ArrayList<FakeJobInProgress>> userJobs = 
+      submitJobs(3, 2, "default");
+        
+    // get the jobs submitted.
+    ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
+    ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
+    ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
+    
+    // reference to the initializedJobs data structure
+    // changes are reflected in the set as they are made by the poller
+    Set<JobID> initializedJobs = initPoller.getInitializedJobList();
+    
+    // we should have 6 jobs in the job queue
+    assertEquals(6, mgr.getQueue("default").getWaitingJobs().size());
+        
+    // run one poller iteration.
+    controlledInitializationPoller.selectJobsToInitialize();
+        
+    // the poller should initialize 6 jobs
+    // 3 users and 2 jobs (with 2 tasks) from each
+    assertEquals(initializedJobs.size(), 6);
+    
+    // now submit one more job from another user, should fail since default's
+    // job submission capacity is full
+    boolean jobSubmissionFailed = false;
+    try {
+      FakeJobInProgress u4j1 = 
+        submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+    } catch (IOException ioe) {
+      jobSubmissionFailed = true;
+    }
+    assertTrue("Job submission of 7th job to 'default' queue didn't fail!", 
+        jobSubmissionFailed);
+
+    // fail some jobs to clear up quota
+    taskTrackerManager.finalizeJob(u2Jobs.get(0), JobStatus.FAILED);
+    taskTrackerManager.finalizeJob(u3Jobs.get(0), JobStatus.FAILED);
+    
+    FakeJobInProgress u1j3 = 
+      submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+
+    // run the poller again.
+    controlledInitializationPoller.selectJobsToInitialize();
 
+    // the poller should initialize 5 jobs
+    // 3 from u1 and one each from u2 and u3
+    assertEquals(initializedJobs.size(), 5);
+
+    // Should fail since u1 is already at limit of 3 jobs
+    jobSubmissionFailed = false;
+    try {
+      FakeJobInProgress u1j4 = 
+        submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+    } catch (IOException ioe) {
+      jobSubmissionFailed = true;
+    }
+    
+    assertTrue("Job submission of 4th job of user 'u1' to queue 'default' " +
+    		"didn't fail!", jobSubmissionFailed);
+  }
+  
   /*
    * testHighPriorityJobInitialization() shows behaviour when high priority job
    * is submitted into a queue and how initialisation happens for the same.
@@ -2484,24 +2598,28 @@ public class TestCapacityScheduler exten
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
+    resConf.setMaxSystemJobs(6); // 6 jobs max
+    resConf.setMaxInitializedActiveTasksPerUser("default", 4);  // 4 tasks max
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
     Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
 
-    // submit 3 jobs for 3 users
-    submitJobs(3,3,"default");
+    // submit 3 jobs for 3 users, only 2 each should be inited since max active
+    // tasks per user is 4 and max jobs is 6
+    HashMap<String, ArrayList<FakeJobInProgress>> userJobs = 
+      submitJobs(3,3,"default");
     controlledInitializationPoller.selectJobsToInitialize();
     assertEquals(initializedJobsList.size(), 6);
     
     // submit 2 job for a different user. one of them will be made high priority
     FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
-    FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+    FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 2, 2, "default", "u4");
     
     controlledInitializationPoller.selectJobsToInitialize();
     
-    // shouldn't change
+    // shouldn't change since max jobs is 6
     assertEquals(initializedJobsList.size(), 6);
     
     assertFalse("Contains U4J1 high priority job " , 
@@ -2510,18 +2628,31 @@ public class TestCapacityScheduler exten
         initializedJobsList.contains(u4j2.getJobID()));
 
     // change priority of one job
-    taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
+    System.err.println("changing prio");
+    taskTrackerManager.setPriority(u4j2, JobPriority.VERY_HIGH);
+    
+    // Finish one of the inited jobs
+    // run 1 more jobs.. 
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"
+    });
     
+    // Finish job_0003
+    taskTrackerManager.finalizeJob(userJobs.get("u1").get(0));
+
     controlledInitializationPoller.selectJobsToInitialize();
     
     // the high priority job should get initialized, but not the
     // low priority job from u4, as we have already exceeded the
     // limit.
-    assertEquals(initializedJobsList.size(), 7);
-    assertTrue("Does not contain U4J1 high priority job " , 
-        initializedJobsList.contains(u4j1.getJobID()));
-    assertFalse("Contains U4J2 Normal priority job " , 
+    assertEquals(initializedJobsList.size(), 5);
+    assertTrue("Does not contain U4J2 high priority job " , 
         initializedJobsList.contains(u4j2.getJobID()));
+    assertFalse("Contains U4J1 Normal priority job " , 
+        initializedJobsList.contains(u4j1.getJobID()));
     controlledInitializationPoller.stopRunning();
   }
   
@@ -2533,9 +2664,7 @@ public class TestCapacityScheduler exten
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
-    
-    JobQueuesManager mgr = scheduler.jobQueuesManager;
-    
+        
     // check proper running job movement and completion
     checkRunningJobMovementAndCompletion();
 
@@ -2588,19 +2717,20 @@ public class TestCapacityScheduler exten
     job.getStatus().setRunState(JobStatus.PREP);
     taskTrackerManager.submitJob(job);
     //check if job is present in waiting list.
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
     assertEquals("Waiting job list does not contain submitted job",
-        1, mgr.getWaitingJobCount("default"));
+        1, queue.getNumWaitingJobs());
     assertTrue("Waiting job does not contain submitted job", 
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     //initialization should fail now.
     controlledInitializationPoller.selectJobsToInitialize();
     //Check if the job has been properly cleaned up.
     assertEquals("Waiting job list contains submitted job",
-        0, mgr.getWaitingJobCount("default"));
+        0, queue.getNumWaitingJobs());
     assertFalse("Waiting job contains submitted job", 
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     assertFalse("Waiting job contains submitted job", 
-        mgr.getRunningJobQueue("default").contains(job));
+        queue.getRunningJobs().contains(job));
   }
   
   private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2618,10 +2748,11 @@ public class TestCapacityScheduler exten
     raiseStatusChangeEvents(mgr);
     
     // it should be there in both the queues.
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
     assertTrue("Job not present in Job Queue",
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     assertTrue("Job not present in Running Queue",
-        mgr.getRunningJobQueue("default").contains(job));
+        queue.getRunningJobs().contains(job));
     
     // assign a task
     checkAssignments("tt1", 
@@ -2637,7 +2768,7 @@ public class TestCapacityScheduler exten
     // the job should also be removed from the job queue as tasks
     // are scheduled
     assertFalse("Job present in Job Queue",
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     
     // complete tasks and job
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
@@ -2646,7 +2777,7 @@ public class TestCapacityScheduler exten
     
     // make sure it is removed from the run queue
     assertFalse("Job present in running queue",
-        mgr.getRunningJobQueue("default").contains(job));
+        queue.getRunningJobs().contains(job));
   }
   
   private void checkFailedRunningJobMovement() throws IOException {
@@ -2658,14 +2789,15 @@ public class TestCapacityScheduler exten
       submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
     
     //check if the job is present in running queue.
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
     assertTrue("Running jobs list does not contain submitted job",
-        mgr.getRunningJobQueue("default").contains(job));
+        queue.getRunningJobs().contains(job));
     
     taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
     
     //check if the job is properly removed from running queue.
     assertFalse("Running jobs list does not contain submitted job",
-        mgr.getRunningJobQueue("default").contains(job));
+        queue.getRunningJobs().contains(job));
     
   }
   
@@ -2753,7 +2885,7 @@ public class TestCapacityScheduler exten
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     // enabled memory-based scheduling
@@ -2781,7 +2913,7 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job1.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
@@ -2797,7 +2929,7 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
     checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job2.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
@@ -2813,7 +2945,7 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
     checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job3.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
@@ -2837,20 +2969,20 @@ public class TestCapacityScheduler exten
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
-    jConf.setUser("u1");
+    jConf.setUser("u2");
     FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Job4, a high memory job cannot be accommodated on a any TT. But with each
     // trip to the scheduler, each of the TT should be reserved by job2.
     assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 2, 0, 0, 0), 
         (String) job4.getSchedulingInfo());
 
     assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
@@ -2860,14 +2992,14 @@ public class TestCapacityScheduler exten
     // slots on tt3. tt1 and tt2 should not be assigned any slots with the
     // reservation stats intact.
     assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
         (String) job4.getSchedulingInfo());
 
     assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
@@ -2876,7 +3008,7 @@ public class TestCapacityScheduler exten
     checkAssignments("tt3", 
         new String[] {
         "attempt_test_0005_m_000001_0 on tt3"});
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     LOG.info(job5.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
@@ -2938,7 +3070,7 @@ public class TestCapacityScheduler exten
     jConf.setMemoryForReduceTask(1 * 1024);
     jConf.setNumMapTasks(6);
     jConf.setNumReduceTasks(6);
-    jConf.setUser("u1");
+    jConf.setUser("u2");
     jConf.setQueueName("q1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
@@ -2959,7 +3091,7 @@ public class TestCapacityScheduler exten
         .getOrderedQueues(TaskType.MAP));
     checkQueuesOrder(qs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
         (String) job1.getSchedulingInfo());
@@ -2972,7 +3104,7 @@ public class TestCapacityScheduler exten
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
         (String) job1.getSchedulingInfo());
@@ -2984,7 +3116,7 @@ public class TestCapacityScheduler exten
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
         (String) job1.getSchedulingInfo());
@@ -2996,7 +3128,7 @@ public class TestCapacityScheduler exten
     checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
         (String) job1.getSchedulingInfo());
@@ -3020,9 +3152,9 @@ public class TestCapacityScheduler exten
         .getOrderedQueues(TaskType.MAP));
     checkQueuesOrder(qs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
-    assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0), 
+    scheduler.updateQueueUsageForTests();
+    assertEquals( // user limit is 6
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 2, 2, 4, 0), 
         (String) job1.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 3, 3, 0), 
@@ -3032,9 +3164,9 @@ public class TestCapacityScheduler exten
     checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-    scheduler.updateQSIInfoForTests();
-    assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0), 
+    scheduler.updateQueueUsageForTests();
+    assertEquals(// user limit is 6
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 2, 2, 4, 0), 
         (String) job1.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 4, 4, 0), 
@@ -3052,9 +3184,11 @@ public class TestCapacityScheduler exten
     p.selectJobsToInitialize();
     //Don't raise the status change event.
     
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
+    
     //check in waiting and initialized jobs list.
     assertTrue("Waiting jobs list does not contain the job",
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     
     assertTrue("Initialized job does not contain the job",
         p.getInitializedJobList().contains(job.getJobID()));
@@ -3064,7 +3198,7 @@ public class TestCapacityScheduler exten
     
     //Check if the job is present in waiting queue
     assertFalse("Waiting jobs list contains failed job",
-        mgr.getWaitingJobs("default").contains(job));
+        queue.getWaitingJobs().contains(job));
     
     //run the poller to do the cleanup
     p.selectJobsToInitialize();
@@ -3081,14 +3215,15 @@ public class TestCapacityScheduler exten
         "u1");
     
     // check in waiting and initialized jobs list.
-    assertTrue("Waiting jobs list does not contain the job", mgr
-        .getWaitingJobs("default").contains(job));
+    CapacitySchedulerQueue queue = mgr.getQueue("default");
+    assertTrue("Waiting jobs list does not contain the job", 
+        queue.getWaitingJobs().contains(job));
     // fail the waiting job
     taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
 
     // Check if the job is present in waiting queue
-    assertFalse("Waiting jobs list contains failed job", mgr
-        .getWaitingJobs("default").contains(job));
+    assertFalse("Waiting jobs list contains failed job", 
+        queue.getWaitingJobs().contains(job));
   }
   
   private void raiseStatusChangeEvents(JobQueuesManager mgr) {
@@ -3096,7 +3231,7 @@ public class TestCapacityScheduler exten
   }
   
   private void raiseStatusChangeEvents(JobQueuesManager mgr, String queueName) {
-    Collection<JobInProgress> jips = mgr.getWaitingJobs(queueName);
+    Collection<JobInProgress> jips = mgr.getQueue(queueName).getWaitingJobs();
     for(JobInProgress jip : jips) {
       if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
         JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
@@ -3212,7 +3347,7 @@ public class TestCapacityScheduler exten
     int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex
     ,int incrReduceIndex
   ) {
-    scheduler.updateQSIInfoForTests();
+    scheduler.updateQueueUsageForTests();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
     String schedulingInfo =
         queueManager.getJobQueueInfo(queue).getSchedulingInfo();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java Fri Mar  4 04:30:34 2011
@@ -60,7 +60,7 @@ public class TestCapacitySchedulerWithJo
           .getTaskScheduler();
       JobQueuesManager mgr = scheduler.jobQueuesManager;
       assertEquals("Failed job present in Waiting queue", 0, mgr
-          .getWaitingJobCount("default"));
+          .getQueue("default").getNumWaitingJobs());
     }
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:30:34 2011
@@ -803,6 +803,15 @@ public class JobInProgress {
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
+  
+  /**
+   * Return total number of map and reduce tasks desired by the job.
+   * @return total number of map and reduce tasks desired by the job
+   */
+  public int desiredTasks() {
+    return desiredMaps() + desiredReduces();
+  }
+  
   public int getNumSlotsPerTask(TaskType taskType) {
     if (taskType == TaskType.MAP) {
       return numSlotsPerMap;
@@ -1361,7 +1370,7 @@ public class JobInProgress {
   
   /**
    * Check if we can schedule an off-switch task for this job.
-   * @param numTaskTrackers.
+   * @param numTaskTrackers number of tasktrackers
    * 
    * We check the number of missed opportunities for the job. 
    * If it has 'waited' long enough we go ahead and schedule.
@@ -1559,7 +1568,7 @@ public class JobInProgress {
       LOG.info("Exceeded limit for reduce input size: Estimated:" + 
           estimatedReduceInputSize + " Limit: " + 
           reduce_input_limit + " Failing Job " + jobId);
-      status.setFailureInfo("Job Exceeded Reduce Input limit " 
+      status.setFailureInfo("Job exceeded Reduce Input limit " 
           + " Limit:  " + reduce_input_limit + 
           " Estimated: " + estimatedReduceInputSize);
       jobtracker.failJob(this);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Fri Mar  4 04:30:34 2011
@@ -54,6 +54,28 @@ class JobQueueJobInProgressListener exte
     JobPriority getPriority() {return priority;}
     long getStartTime() {return startTime;}
     JobID getJobID() {return id;}
+    
+    @Override
+    public boolean equals(Object obj) {
+      if (obj == null || obj.getClass() != JobSchedulingInfo.class) {
+        return false;
+      } else if (obj == this) {
+        return true;
+      }
+      else if (obj instanceof JobSchedulingInfo) {
+        JobSchedulingInfo that = (JobSchedulingInfo)obj;
+        return (this.id.equals(that.id) && 
+                this.startTime == that.startTime && 
+                this.priority == that.priority);
+      }
+      return false;
+    }
+
+    @Override
+    public int hashCode() {
+      return (int)(id.hashCode() * priority.hashCode() + startTime);
+    }
+
   }
   
   static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 04:30:34 2011
@@ -3717,7 +3717,20 @@ public class JobTracker implements MRCon
         jobInfo.write(out);
         out.close();
       }
-      return addJob(jobId, job);
+      
+      // Submit the job
+      JobStatus status;
+      try {
+        status = addJob(jobId, job);
+      } catch (IOException ioe) {
+        LOG.info("Job " + jobId + " submission failed!", ioe);
+        status = job.getStatus();
+        status.setFailureInfo(StringUtils.stringifyException(ioe));
+        failJob(job);
+        throw ioe;
+      }
+      
+      return status;
     }
   }
 
@@ -3753,19 +3766,15 @@ public class JobTracker implements MRCon
    * adding a job. This is the core job submission logic
    * @param jobId The id for the job submitted which needs to be added
    */
-  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
+  private synchronized JobStatus addJob(JobID jobId, JobInProgress job) 
+  throws IOException {
     totalSubmissions++;
 
     synchronized (jobs) {
       synchronized (taskScheduler) {
         jobs.put(job.getProfile().getJobID(), job);
         for (JobInProgressListener listener : jobInProgressListeners) {
-          try {
-            listener.jobAdded(job);
-          } catch (IOException ioe) {
-            LOG.warn("Failed to add and so skipping the job : "
-                + job.getJobID() + ". Exception : " + ioe);
-          }
+          listener.jobAdded(job);
         }
       }
     }
@@ -3946,8 +3955,8 @@ public class JobTracker implements MRCon
           StringUtils.stringifyException(kie));
       killJob(job);
     } catch (Throwable t) {
-      String failureInfo = "Job initialization failed:\n" +
-      StringUtils.stringifyException(t);
+      String failureInfo = 
+        "Job initialization failed:\n" + StringUtils.stringifyException(t);
       // If the job initialization is failed, job state will be FAILED
       LOG.error(failureInfo);
       job.getStatus().setFailureInfo(failureInfo);
@@ -4889,7 +4898,12 @@ public class JobTracker implements MRCon
   public void refreshQueues() throws IOException {
     LOG.info("Refreshing queue information. requested by : " +
         UserGroupInformation.getCurrentUser().getShortUserName());
-    this.queueManager.refreshQueues(new Configuration(this.conf));
+    this.queueManager.refreshQueues(new Configuration());
+    
+    synchronized (taskScheduler) {
+      taskScheduler.refresh();
+    }
+
   }
   
   synchronized String getReasonsForBlacklisting(String host) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java Fri Mar  4 04:30:34 2011
@@ -28,6 +28,7 @@ import java.util.TreeSet;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authorize.AccessControlList;
 import org.apache.hadoop.util.StringUtils;
@@ -61,7 +62,8 @@ class QueueManager {
   /** Whether ACLs are enabled in the system or not. */
   private boolean aclsEnabled;
   /** Map of a queue name and Queue object */
-  final HashMap<String,Queue> queues;
+  final HashMap<String,Queue> queues = new HashMap<String,Queue>();
+  
   /**
    * Enum representing an AccessControlList that drives set of operations that
    * can be performed on a queue.
@@ -98,20 +100,24 @@ class QueueManager {
   public QueueManager(Configuration conf) {
     checkDeprecation(conf);
     conf.addResource(QUEUE_ACLS_FILE_NAME);
-    queues = new HashMap<String,Queue>();
+    
+    // Get configured ACLs and state for each queue
+    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+
+    queues.putAll(parseQueues(conf)); 
+  }
+  
+  synchronized private Map<String, Queue> parseQueues(Configuration conf) {
+    Map<String, Queue> queues = new HashMap<String, Queue>();
     // First get the queue names
     String[] queueNameValues = conf.getStrings("mapred.queue.names",
         new String[]{JobConf.DEFAULT_QUEUE_NAME});
-    // Get configured ACLs and state for each queue
-    aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
     for (String name : queueNameValues) {
-      try {
-        queues.put(name, new Queue(name, getQueueAcls(name, conf),
-              getQueueState(name, conf)));
-      } catch (Throwable t) {
-        LOG.warn("Not able to initialize queue " + name, t);
-      }
+      queues.put(name, new Queue(name, getQueueAcls(name, conf),
+          getQueueState(name, conf)));
     }
+    
+    return queues;
   }
   
   /**
@@ -218,6 +224,7 @@ class QueueManager {
    * @throws IOException when queue ACL configuration file is invalid.
    */
   synchronized void refreshQueues(Configuration conf) throws IOException {
+    
     // First check if things are configured in mapred-site.xml,
     // so we can print out a deprecation warning.
     // This check is needed only until we support the configuration
@@ -228,23 +235,36 @@ class QueueManager {
     // will be overridden.
     conf.addResource(QUEUE_ACLS_FILE_NAME);
 
+    // Now parse the queues and check to ensure no queue has been deleted
+    Map<String, Queue> newQueues = parseQueues(conf);
+    checkQueuesForDeletion(queues, newQueues);
+
     // Now we refresh the properties of the queues. Note that we
     // do *not* refresh the queue names or the acls flag. Instead
     // we use the older values configured for them.
-    LOG.info("Refreshing acls and state for configured queues.");
-    try {
-      for (String qName : getQueues()) {
-        Queue q = queues.get(qName);
-        q.setAcls(getQueueAcls(qName, conf));
-        q.setState(getQueueState(qName, conf));
+    queues.clear();
+    queues.putAll(newQueues);
+    LOG.info("Queues acls, state and configs refreshed: " + 
+        queues.size() + " queues present now.");
+  }
+
+  private void checkQueuesForDeletion(Map<String, Queue> currentQueues,
+      Map<String, Queue> newQueues) {
+    for (String queue : currentQueues.keySet()) {
+      if (!newQueues.containsKey(queue)) {
+        throw new IllegalArgumentException("Couldn't find queue '" + queue + 
+            "' during refresh!");
+      }
+    }
+    
+    // Mark new queues as STOPPED
+    for (String queue : newQueues.keySet()) {
+      if (!currentQueues.containsKey(queue)) {
+        newQueues.get(queue).setState(QueueState.STOPPED);
       }
-    } catch (Throwable t) {
-      LOG.warn("Invalid queue configuration", t);
-      throw new IOException("Invalid queue configuration", t);
     }
-
   }
-
+  
   private void checkDeprecation(Configuration conf) {
     // check if queues are defined.
     String[] queues = conf.getStrings("mapred.queue.names");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Mar  4 04:30:34 2011
@@ -91,5 +91,10 @@ abstract class TaskScheduler implements 
    * @return
    */
   public abstract Collection<JobInProgress> getJobs(String queueName);
-    
+
+  /**
+   * Refresh the configuration of the scheduler.
+   */
+  public void refresh() throws IOException {}
+  
 }