You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:26:09 UTC

svn commit: r1077538 [2/2] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 04:26:09 2011
@@ -192,6 +192,12 @@ public class TestCapacityScheduler exten
     }
 
     @Override
+    public Task obtainNewLocalMapTask(final TaskTrackerStatus tts, int clusterSize,
+        int ignored) throws IOException {
+      return obtainNewMapTask(tts, clusterSize, ignored);
+    }
+    
+    @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
@@ -751,7 +757,6 @@ public class TestCapacityScheduler exten
             numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
-    scheduler.setAssignMultipleTasks(false);
     scheduler.setTaskTrackerManager(taskTrackerManager);
 
     conf = new JobConf();
@@ -889,7 +894,6 @@ public class TestCapacityScheduler exten
     resConf.setFakeQueues(queues);
     resConf.setMaxCapacity("default", 50.0f);
     scheduler.setResourceManagerConf(resConf);
-    scheduler.setAssignMultipleTasks(true);
     scheduler.start();
 
     //submit the Job
@@ -994,31 +998,32 @@ public class TestCapacityScheduler exten
     
     // I. Check multiple assignments with running tasks within job
     // ask for a task from first job
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    //  ask for another task from the first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    
+    checkAssignments("tt1", 
+                     new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+                                   "attempt_test_0001_m_000002_0 on tt1"}
+      );
+
     // complete tasks
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
     
     // II. Check multiple assignments with running tasks across jobs
     // ask for a task from first job
-    t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    
-    //  ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    
-    // complete tasks
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+    checkAssignments("tt1", 
+                     new String[] {"attempt_test_0001_m_000003_0 on tt1", 
+                                   "attempt_test_0002_m_000001_0 on tt1"}
+    );
+
+    // complete task from job1
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
     
     // III. Check multiple assignments with completed tasks across jobs
     // ask for a task from the second job
-    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     
-    // complete task
+    // complete tasks
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
     
     // IV. Check assignment with completed job
     // finish first job
@@ -1026,7 +1031,7 @@ public class TestCapacityScheduler exten
     
     // ask for another task from the second job
     // if tasks can be assigned then the structures are properly updated 
-    t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
     
     // complete task
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
@@ -1038,7 +1043,6 @@ public class TestCapacityScheduler exten
    * @throws Exception
    */
   public void testMultiTaskAssignmentInSingleQueue() throws Exception {
-    try {
       setUp(1, 6, 2);
       // set up some queues
       String[] qs = {"default"};
@@ -1048,25 +1052,13 @@ public class TestCapacityScheduler exten
       resConf.setFakeQueues(queues);
       scheduler.setResourceManagerConf(resConf);
       scheduler.start();
-      scheduler.setAssignMultipleTasks(true);
 
       //Submit the job with 6 maps and 2 reduces
       FakeJobInProgress j1 = submitJobAndInit(
         JobStatus.PREP, 6, 2, "default", "u1");
 
       List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 2);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
+      assertEquals(tasks.size(), 7);
 
       for (Task task : tasks) {
         if (task.toString().equals("attempt_test_0001_m_000001_0 on tt1")) {
@@ -1077,123 +1069,51 @@ public class TestCapacityScheduler exten
         }
       }
 
-      tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 2);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_r_000002_0 on tt1");
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
-
-      //now both the reduce slots are being used , hence we should not 
-      // get only 1 map task in this assignTasks call.
+      // Only 1 reduce left
       tasks = scheduler.assignTasks(tracker("tt1"));
       assertEquals(tasks.size(), 1);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_m_000003_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          fail("should not give reduce task " + task.toString());
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
-    } finally {
-      scheduler.setAssignMultipleTasks(false);
-    }
   }
 
   public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
-    try {
-      setUp(1, 6, 2);
+      setUp(1, 4, 2);
       // set up some queues
-      String[] qs = {"default","q1"};
+      String[] qs = {"q1","q2"};
       taskTrackerManager.addQueues(qs);
       ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-      queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
       queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+      queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
       resConf.setFakeQueues(queues);
       scheduler.setResourceManagerConf(resConf);
       scheduler.start();
-      scheduler.setAssignMultipleTasks(true);
 
+      System.err.println("testMultiTaskAssignmentInMultipleQueues");
       //Submit the job with 6 maps and 2 reduces
-      submitJobAndInit(
-        JobStatus.PREP, 6, 1, "default", "u1");
-
-      FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP,2,1,"q1","u2");
-
-      List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 2);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
-      
-      // next assignment will be for job in second queue.
-      tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 2);
+      FakeJobInProgress j1 = 
+        submitJobAndInit(JobStatus.PREP, 6, 1, "q1", "u1");
+      FakeJobInProgress j2 = 
+        submitJobAndInit(JobStatus.PREP,2,1,"q2","u2");
+
+      List<Task> tasks = checkAssignments("tt1", 
+          new String[] {"attempt_test_0002_m_000001_0 on tt1",
+                        "attempt_test_0001_m_000001_0 on tt1",
+                        "attempt_test_0001_m_000002_0 on tt1",
+                        "attempt_test_0002_m_000002_0 on tt1",
+                        "attempt_test_0002_r_000001_0 on tt1",
+                        });
+      //Now finish the tasks
       for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0002_m_000001_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0002_r_000001_0 on tt1");
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
-
-      //now both the reduce slots are being used , hence we sholdnot get only 1
-      //map task in this assignTasks call.
-      tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 1);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          // we get from job 2 because the queues are equal in capacity usage
-          // and sorting leaves order unchanged.
-          assertEquals(task.toString(), "attempt_test_0002_m_000002_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          fail("should not give reduce task " + task.toString());
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
+        FakeJobInProgress j = 
+          (task.getTaskID().getJobID().getId() == 1) ? j1 : j2;
+        taskTrackerManager.finishTask("tt1", task.getTaskID().toString(), j);
       }
 
-      tasks = scheduler.assignTasks(tracker("tt1"));
-      assertEquals(tasks.size(), 1);
-      for (Task task : tasks) {
-        if (task.toString().contains("_m_")) {
-          LOG.info(" map task assigned " + task.toString());
-          assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
-        } else if (task.toString().contains("_r_")) {
-          LOG.info(" reduce task assigned " + task.toString());
-          fail("should not give reduce task " + task.toString());
-        } else {
-          fail(" should not have come here " + task.toString());
-        }
-      }
-    } finally {
-      scheduler.setAssignMultipleTasks(false);
-    }
+      checkAssignments("tt1", 
+          new String[] {"attempt_test_0001_m_000003_0 on tt1",
+                        "attempt_test_0001_m_000004_0 on tt1",
+                        "attempt_test_0001_m_000005_0 on tt1",
+                        "attempt_test_0001_m_000006_0 on tt1",
+                        "attempt_test_0001_r_000001_0 on tt1",
+                        });
   }
 
   // basic tests, should be able to submit to queues
@@ -1213,12 +1133,17 @@ public class TestCapacityScheduler exten
     JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
     
     // when we ask for a task, we should get one, from the job submitted
-    Task t;
-    t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+                      "attempt_test_0001_m_000002_0 on tt1", 
+                      "attempt_test_0001_r_000001_0 on tt1"});
     // submit another job, to a different queue
     j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // now when we get a task, it should be from the second job
-    t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0002_m_000001_0 on tt2", 
+        "attempt_test_0002_m_000002_0 on tt2", 
+        "attempt_test_0002_r_000001_0 on tt2"});
   }
   
   public void testGetJobs() throws Exception {
@@ -1313,19 +1238,25 @@ public class TestCapacityScheduler exten
     submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
     
     // job from q2 runs first because it has some non-zero capacity.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0002_m_000001_0 on tt1", 
+        "attempt_test_0002_m_000002_0 on tt1"});
     verifyCapacity("0", "default");
     verifyCapacity("3", "q2");
     
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt3");
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0002_m_000003_0 on tt2", 
+        "attempt_test_0002_m_000004_0 on tt2"});
     verifyCapacity("0", "default");
     verifyCapacity("5", "q2");
     
     // add another tt to increase tt slots
     taskTrackerManager.addTaskTracker("tt4");
-    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    checkAssignments("tt3", 
+        new String[] {"attempt_test_0002_m_000005_0 on tt3", 
+        "attempt_test_0002_m_000006_0 on tt3"});
     verifyCapacity("0", "default");
     verifyCapacity("7", "q2");
     
@@ -1333,7 +1264,9 @@ public class TestCapacityScheduler exten
     taskTrackerManager.addTaskTracker("tt5");
     // now job from default should run, as it is furthest away
     // in terms of runningMaps / capacity.
-    checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
+    checkAssignments("tt4", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt4", 
+        "attempt_test_0002_m_000007_0 on tt4"});
     verifyCapacity("1", "default");
     verifyCapacity("9", "q2");
   }
@@ -1342,7 +1275,7 @@ public class TestCapacityScheduler exten
                                           String queue) throws IOException {
     String schedInfo = taskTrackerManager.getQueueManager().
                           getSchedulerInfo(queue).toString();    
-    assertTrue(schedInfo.contains("Map tasks\nCapacity: " 
+    assertTrue(schedInfo, schedInfo.contains("Map tasks\nCapacity: " 
         + expectedCapacity + " slots"));
   }
   
@@ -1361,15 +1294,18 @@ public class TestCapacityScheduler exten
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // I should get another map task. 
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // we should get 2 task 
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_m_000002_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+
     // Now we're at full capacity for maps. If I ask for another map task,
     // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    // and another
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0001_m_000003_0 on tt2", 
+        "attempt_test_0001_m_000004_0 on tt2", 
+        "attempt_test_0001_r_000002_0 on tt2"});
   }
 
   /**
@@ -1382,7 +1318,10 @@ public class TestCapacityScheduler exten
   public void testHighMemoryBlockingWithMaxCapacity()
       throws IOException {
 
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+    final int NUM_MAP_SLOTS = 2;
+    final int NUM_REDUCE_SLOTS = 2;
+    taskTrackerManager = 
+      new FakeTaskTrackerManager(2, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
 
     taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1405,7 +1344,6 @@ public class TestCapacityScheduler exten
         JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
-    scheduler.setAssignMultipleTasks(true);
 
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(2 * 1024);
@@ -1433,7 +1371,8 @@ public class TestCapacityScheduler exten
 
     checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
     checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
 
     //we have reached the maximum limit for map, so no more map tasks.
     //we have used 1 reduce already and 1 more reduce slot is left for the
@@ -1457,7 +1396,8 @@ public class TestCapacityScheduler exten
 
     checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
     checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 2, 200.0f,0,2);
-    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
 
     //complete the high ram job on tt1.
     for (Task task : t2) {
@@ -1485,7 +1425,6 @@ public class TestCapacityScheduler exten
     resConf.setFakeQueues(queues);
     resConf.setMaxCapacity("default", 75.0f);
     scheduler.setResourceManagerConf(resConf);
-    scheduler.setAssignMultipleTasks(true);
     scheduler.start();
 
     // submit a job
@@ -1499,22 +1438,22 @@ public class TestCapacityScheduler exten
     //  1 map and 1 reduce.
     // after max capacity it is 1.5 each.
 
-    //first job would be given 1 job each.
-    List<Task> t1 = this.checkMultipleAssignment(
-      "tt1", "attempt_test_0001_m_000001_0 on tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
-
-    //for user u1 we have reached the limit. that is 1 job.
-    //1 more map and reduce tasks.
-    List<Task> t2 = this.checkMultipleAssignment(
-      "tt1", "attempt_test_0002_m_000001_0 on tt1",
-      "attempt_test_0002_r_000001_0 on tt1");
-
-    t1 = this.checkMultipleAssignment(
-      "tt2", "attempt_test_0001_m_000002_0 on tt2",
-      "attempt_test_0001_r_000002_0 on tt2");
-
-    t1 = this.checkMultipleAssignment("tt2", null,null);
+    //each job would be given 1 map task each.
+    checkAssignments("tt1", 
+      new String[] {"attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0002_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+
+    //abt to hit max map capacity for default
+    //hit user limit for reduces
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0001_m_000002_0 on tt2",
+          "attempt_test_0002_r_000001_0 on tt2"});
+
+    // only 1 reduce slot is remaining on tt2
+    // no more maps since no map slots are available 
+    checkAssignments("tt2", 
+        new String[] { "attempt_test_0001_r_000002_0 on tt2"});
   }
 
 
@@ -1533,17 +1472,19 @@ public class TestCapacityScheduler exten
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // we should get 2 maps and 1 reduce 
+    checkAssignments("tt1", 
+        new String[]{"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_m_000002_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+    
     // Submit another job, from a different user
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Now we're at full capacity for maps. If I ask for another map task,
-    // I should get a map task from the default queue's capacity. 
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    // Now if I ask for a map task, it should come from the second job from default queue's capacity 
+    checkAssignments("tt2", 
+        new String[]{"attempt_test_0002_m_000001_0 on tt2", 
+        "attempt_test_0002_m_000002_0 on tt2", 
+        "attempt_test_0002_r_000001_0 on tt2"});
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1561,16 +1502,19 @@ public class TestCapacityScheduler exten
     // submit a job  
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
-    // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    // we should get 2 map tasks & 1 reduce 
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_m_000002_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+    
     // Submit another job, from a different user
     submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
     // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    // and another
-    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0002_m_000001_0 on tt2", 
+        "attempt_test_0002_m_000002_0 on tt2", 
+        "attempt_test_0002_r_000001_0 on tt2"});
   }
 
   // test user limits when a 2nd job is submitted much after first job 
@@ -1590,25 +1534,38 @@ public class TestCapacityScheduler exten
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
     // for queue 'q2', the capacity for maps is 2. Since we're the only user,
     // we should get a task 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // since we're the only job, we get another map
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_m_000002_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+
     // we get two more maps from 'default queue'
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0001_m_000003_0 on tt2", 
+        "attempt_test_0001_m_000004_0 on tt2", 
+        "attempt_test_0001_r_000002_0 on tt2"});
+
     // Submit another job, from a different user
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+    
     // one of the task finishes
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    
     // Now if I ask for a map task, it should come from the second job 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // another task from job1 finishes, another new task to job2
+    // and reduce from job2
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0002_m_000001_0 on tt1"});
+    
+    // another task from job1 finishes, another new map and reduce task to job2
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    
+    // job2 shud get the map slot & reduce
+    checkAssignments("tt1", new String[] {"attempt_test_0002_m_000002_0 on tt1"});
+    
     // now we have equal number of tasks from each job. Whichever job's
     // task finishes, that job gets a new task
     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
-    checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+    checkAssignments("tt2", new String[] {"attempt_test_0001_m_000005_0 on tt2"});
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
     checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
   }
@@ -1630,23 +1587,44 @@ public class TestCapacityScheduler exten
 
     // u1 submits job
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    // it gets the first 5 slots
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-    checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+    // it gets the first 6 slots
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_m_000002_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"
+    });
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0001_m_000003_0 on tt2",
+        "attempt_test_0001_m_000004_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2"
+    });
+    checkAssignments("tt3", 
+        new String[] {
+        "attempt_test_0001_m_000005_0 on tt3",
+        "attempt_test_0001_m_000006_0 on tt3",
+        "attempt_test_0001_r_000003_0 on tt3"
+    });
+    
     // u2 submits job with 4 slots
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
     // u2 should get next 4 slots
-    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
-    checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
-    checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
-    checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
-    // last slot should go to u1, since u2 has no more tasks
-    checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+    checkAssignments("tt4", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt4",
+        "attempt_test_0002_m_000002_0 on tt4",
+        "attempt_test_0002_r_000001_0 on tt4"
+    });
+    checkAssignments("tt5", 
+        new String[] {
+        "attempt_test_0002_m_000003_0 on tt5",
+        "attempt_test_0002_m_000004_0 on tt5",
+        "attempt_test_0002_r_000002_0 on tt5"
+    });
+
     // u1 finishes a task
-    taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+    taskTrackerManager.finishTask("tt3", "attempt_test_0001_m_000006_0", j1);
     // u1 submits a few more jobs 
     // All the jobs are inited when submitted
     // because of addition of Eager Job Initializer all jobs in this
@@ -1660,10 +1638,10 @@ public class TestCapacityScheduler exten
     submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
     // next slot should go to u3, even though u2 has an earlier job, since
     // user limits have changed and u1/u2 are over limits
-    checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+    checkAssignment("tt3", "attempt_test_0007_m_000001_0 on tt3");
     // some other task finishes and u3 gets it
-    taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
-    checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+    taskTrackerManager.finishTask("tt3", "attempt_test_0001_m_000005_0", j1);
+    checkAssignment("tt3", "attempt_test_0007_m_000002_0 on tt3");
     // now, u2 finishes a task
     taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
     // next slot will go to u1, since u3 has nothing to run and u1's job is 
@@ -1679,6 +1657,7 @@ public class TestCapacityScheduler exten
    */
   public void testUserLimitsForHighMemoryJobs()
       throws IOException {
+    System.err.println("testUserLimitsForHighMemoryJobs");
     taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     String[] qs = { "default" };
@@ -1720,30 +1699,25 @@ public class TestCapacityScheduler exten
     jConf.setUser("u2");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // Verify that normal job takes 3 task assignments to hit user limits
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
-    // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
-    // are hit. So u2 should get slots
-
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    // Verify that normal job takes 3 task assignments to hit user limits,
+    // and then j2 gets 4 slots
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_m_000002_0 on tt1",
+        "attempt_test_0001_m_000003_0 on tt1",
+        "attempt_test_0001_m_000004_0 on tt1",
+        "attempt_test_0001_m_000005_0 on tt1",
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0002_m_000002_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1",
+    });
 
     // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
     // slots. Because of high memory tasks, giving u2 another task would
     // overflow limits. So, no more tasks should be given to anyone.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+//    assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
+//    assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
   }
 
   /*
@@ -1781,6 +1755,7 @@ public class TestCapacityScheduler exten
    */
 
   public void testSchedulingInformation() throws Exception {
+    System.err.println("testSchedulingInformation()");
     String[] qs = {"default", "q2"};
     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1861,7 +1836,11 @@ public class TestCapacityScheduler exten
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
     raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
     //assign one job
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+    
     //Initalize extra job.
     controlledInitializationPoller.selectJobsToInitialize();
 
@@ -1872,37 +1851,20 @@ public class TestCapacityScheduler exten
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
-    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
-    assertEquals(infoStrings[8], "Running tasks: 1");
-    assertEquals(infoStrings[9], "Active users:");
-    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
-    assertEquals(infoStrings[15], "Running tasks: 0");
-    assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
-
-    //assign a reduce task
-    Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    // make sure we update our stats
-    scheduler.updateQSIInfoForTests();
-    schedulingInfo =
-      queueManager.getJobQueueInfo("default").getSchedulingInfo();
-    infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 22);
-    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
-    assertEquals(infoStrings[8], "Running tasks: 1");
-    assertEquals(infoStrings[9], "Active users:");
-    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
-    assertEquals(infoStrings[15], "Running tasks: 1");
-    assertEquals(infoStrings[16], "Active users:");
-    assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
-
+    System.err.println(schedulingInfo);
+    assertEquals(schedulingInfo, 22, infoStrings.length);
+    assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[18], infoStrings[20], "Number of Waiting Jobs: 4");
+    
     //Complete the job and check the running tasks count
     FakeJobInProgress u1j1 = userJobs.get(0);
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
-    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1j1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", u1j1);
     taskTrackerManager.finalizeJob(u1j1);
 
     // make sure we update our stats
@@ -1911,11 +1873,11 @@ public class TestCapacityScheduler exten
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
     assertEquals(infoStrings.length, 18);
-    assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
-    assertEquals(infoStrings[8], "Running tasks: 0");
-    assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
-    assertEquals(infoStrings[13], "Running tasks: 0");
-    assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
+    assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 0");
+    assertEquals(infoStrings[12], infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+    assertEquals(infoStrings[13], infoStrings[13], "Running tasks: 0");
+    assertEquals(infoStrings[16], infoStrings[16], "Number of Waiting Jobs: 4");
 
     //Fail a job which is initialized but not scheduled and check the count.
     FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1968,7 +1930,10 @@ public class TestCapacityScheduler exten
 
     //Now schedule a map should be job3 of the user as job1 succeeded job2
     //failed and now job3 is running
-    t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0003_m_000001_0 on tt1",
+        "attempt_test_0003_r_000001_0 on tt1"});
     FakeJobInProgress u1j3 = userJobs.get(2);
     assertTrue("User Job 3 not running ",
         u1j3.getStatus().getRunState() == JobStatus.RUNNING);
@@ -1981,12 +1946,14 @@ public class TestCapacityScheduler exten
     schedulingInfo =
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
     infoStrings = schedulingInfo.split("\n");
-    assertEquals(infoStrings.length, 20);
-    assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
-    assertEquals(infoStrings[8], "Running tasks: 1");
-    assertEquals(infoStrings[9], "Active users:");
-    assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
-    assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
+    assertEquals(infoStrings.length, 22);
+    assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
+    assertEquals(infoStrings[9], infoStrings[9], "Active users:");
+    assertEquals(infoStrings[10], infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+    assertEquals(infoStrings[14], infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+    assertEquals(infoStrings[15], infoStrings[15], "Running tasks: 1");
+    assertEquals(infoStrings[18], infoStrings[20], "Number of Waiting Jobs: 1");
 
     //Fail the executing job
     taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -2036,8 +2003,9 @@ public class TestCapacityScheduler exten
     // assert that all tasks are launched even though they transgress the
     // scheduling limits.
 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
   }
 
   /**
@@ -2050,7 +2018,10 @@ public class TestCapacityScheduler exten
       throws IOException {
 
     // 2 map and 1 reduce slots
-    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+    final int NUM_MAP_SLOTS = 2;
+    final int NUM_REDUCE_SLOTS = 1;
+    taskTrackerManager = 
+      new FakeTaskTrackerManager(1, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
 
     taskTrackerManager.addQueues(new String[] { "default" });
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2101,18 +2072,15 @@ public class TestCapacityScheduler exten
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
     
     // first, a map from j1 will run
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // Total 2 map slots should be accounted for.
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0002_r_000001_0 on tt1"});
+    // Total 2 maps & 1 reduce slot should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
-    // at this point, the scheduler tries to schedule another map from j1. 
-    // there isn't enough space. The second job's reduce should be scheduled.
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    // Total 1 reduce slot should be accounted for.
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         100.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L,
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
   }
 
   /**
@@ -2133,7 +2101,10 @@ public class TestCapacityScheduler exten
       throws IOException {
 
     LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
+    final int NUM_MAP_SLOTS = 2;
+    final int NUM_REDUCE_SLOTS = 2;
+    taskTrackerManager = 
+      new FakeTaskTrackerManager(3, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -2167,25 +2138,22 @@ public class TestCapacityScheduler exten
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Fill a tt with this job's tasks.
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
-    assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
-        (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 0L);
-
-    // same for reduces.
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_m_000002_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
+    // Total 2 map slot and 1 reduce should be accounted for.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 33.33f);
     assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 1, 1, 0),
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 2, 0, 1, 1, 0),
         (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
 
     // fill another TT with the rest of the tasks of the job
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+    checkAssignments("tt2", 
+        new String[] {"attempt_test_0001_r_000002_0 on tt2"});
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
@@ -2200,19 +2168,19 @@ public class TestCapacityScheduler exten
 
     // Have another TT run one task of each type of the high RAM
     // job. This will fill up the TT. 
-    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+    checkAssignments("tt3", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt3",
+        "attempt_test_0002_r_000001_0 on tt3"
+    });
+    
     checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
     assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 0, 0, 0, 0),
-        (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 0L);
-
-    checkAssignment("tt3", "attempt_test_0002_r_000001_0 on tt3");
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
-    assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 0, 1, 2, 0),
         (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
+
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
         + "1 map, 1 reduce tasks.");
@@ -2227,23 +2195,30 @@ public class TestCapacityScheduler exten
 
     // Send a TT with insufficient space for task assignment,
     // This will cause a reservation for the high RAM job.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0002_m_000002_0 on tt2"
+    });
 
     // reserved tasktrackers contribute to occupied slots for maps and reduces
     checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
-    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+    checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 1 * 1024L, 
+                                 0, NUM_REDUCE_SLOTS-1);
     LOG.info(job2.getSchedulingInfo());
     assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 2),
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 0, 1, 2, 2),
         (String) job2.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
     
     // Reservations are already done for job2. So job3 should go ahead.
-    checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
-    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0003_m_000001_0 on tt1", 
+        "attempt_test_0003_r_000001_0 on tt1"});
   }
 
   /**
@@ -2256,7 +2231,10 @@ public class TestCapacityScheduler exten
   public void testMemoryMatchingWithRetiredJobs() throws IOException {
     // create a cluster with a single node.
     LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
-    taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
+    final int NUM_MAP_SLOTS = 2;
+    final int NUM_REDUCE_SLOTS = 2;
+    taskTrackerManager = 
+      new FakeTaskTrackerManager(1, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
 
     // create scheduler
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2280,35 +2258,33 @@ public class TestCapacityScheduler exten
     scheduler.start();
     
     // submit a normal job
-    LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
+    LOG.debug("Submitting a normal job with 1 maps and 1 reduces");
     JobConf jConf = new JobConf();
-    jConf.setNumMapTasks(2);
-    jConf.setNumReduceTasks(2);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
     jConf.setMemoryForMapTask(512);
     jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
+    jConf.setSpeculativeExecution(false);
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // 1st cycle - 1 map gets assigned.
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    // Total 1 map slot should be accounted for.
+    // 1st cycle - 1 map & 1 reduce gets assigned.
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
+    // Total 1 map & 1 reduce slot should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
-    checkMemReservedForTasksOnTT("tt1",  512L, 0L);
-
-    // 1st cycle of reduces - 1 reduce gets assigned.
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
-        50.0f);
-    checkMemReservedForTasksOnTT("tt1",  512L, 512L);
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
+    checkMemReservedForTasksOnTT("tt1",  512L, 512L, 
+                                 NUM_MAP_SLOTS-1, NUM_REDUCE_SLOTS-1);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
     // No more map/reduce slots should be accounted for.
     checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
-    checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
-        0.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 0, 0, 0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
@@ -2326,32 +2302,32 @@ public class TestCapacityScheduler exten
     
     // since with HADOOP-5964, we don't rely on a job conf to get
     // the memory occupied, scheduling should be able to work correctly.
-    t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0002_r_000001_0 on tt1"
+    });
     checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
-    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
-    // assign a reduce now.
-    t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
-    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
     
     // now, no more can be assigned because all the slots are blocked.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
 
     // finish the tasks on the tracker.
-    taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
     
-    // now a new task can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    // now new tasks can be assigned.
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0002_m_000002_0 on tt1",
+        "attempt_test_0002_r_000002_0 on tt1"});
     checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
-    // memory used will change because of the finished task above.
-    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-    
-    // reduce can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
-    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L, 
+                                 NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
   }
 
   /*
@@ -2433,14 +2409,20 @@ public class TestCapacityScheduler exten
     raiseStatusChangeEvents(mgr);
     
     // get some tasks assigned.
-    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
-    Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
-    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
-    taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
-    taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"
+    });
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_r_000001_0 on tt2"
+    });
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1Jobs.get(0));
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", u1Jobs.get(0));
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
 
     // as some jobs have running tasks, the poller will now
     // pick up new jobs to initialize.
@@ -2461,10 +2443,13 @@ public class TestCapacityScheduler exten
         initializedJobs.contains(u1Jobs.get(1).getJobID()));
     
     // finish one more job
-    t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
-    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0003_m_000001_0 on tt1",
+        "attempt_test_0003_r_000001_0 on tt1"
+    });
+    taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", u1Jobs.get(2));
+    taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_0", u1Jobs.get(2));
 
     // no new jobs should be picked up, because max user limit
     // is still 3.
@@ -2473,10 +2458,11 @@ public class TestCapacityScheduler exten
     assertEquals(initializedJobs.size(), 5);
     
     // run 1 more jobs.. 
-    t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
-    t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
-    taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
-    taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0004_m_000001_0 on tt1",
+        "attempt_test_0004_r_000001_0 on tt1"
+    });
     
     // Now initialised jobs should contain user 4's job, as
     // user 1's jobs are all done and the number of users is
@@ -2578,8 +2564,9 @@ public class TestCapacityScheduler exten
       submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {"attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
   }
   
   public void testFailedJobInitalizations() throws Exception {
@@ -2637,8 +2624,10 @@ public class TestCapacityScheduler exten
         mgr.getRunningJobQueue("default").contains(job));
     
     // assign a task
-    Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1", 
+        "attempt_test_0001_r_000001_0 on tt1"});
     
     controlledInitializationPoller.selectJobsToInitialize();
     
@@ -2724,14 +2713,18 @@ public class TestCapacityScheduler exten
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(mgr);
 
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1" , 
+        "attempt_test_0001_r_000001_0 on tt1"});
     assertTrue("Pending maps of job1 greater than zero", 
         (fjob1.pendingMaps() == 0));
-    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
-    assertTrue("Pending reduces of job2 greater than zero", 
+    assertTrue("Pending reduces of job1 greater than zero", 
         (fjob1.pendingReduces() == 0));
-    checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0001_m_000001_1 on tt2", 
+        "attempt_test_0001_r_000001_1 on tt2"});
 
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
     taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
@@ -2739,8 +2732,10 @@ public class TestCapacityScheduler exten
     taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
     taskTrackerManager.finalizeJob(fjob1);
     
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0002_m_000001_0 on tt1", 
+        "attempt_test_0002_r_000001_0 on tt1"});
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
     taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
     taskTrackerManager.finalizeJob(fjob2);    
@@ -2775,28 +2770,55 @@ public class TestCapacityScheduler exten
     scheduler.start();
 
     LOG.debug("Submit a regular memory(1GB vmem maps/reduces) job of "
-        + "3 map/red tasks");
+        + "1 map & 0 red tasks");
     JobConf jConf = new JobConf(conf);
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(3);
-    jConf.setNumReduceTasks(3);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-
-    // assign one map task of job1 on all the TTs
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkAssignment("tt3", "attempt_test_0001_m_000003_0 on tt3");
     scheduler.updateQSIInfoForTests();
-
     LOG.info(job1.getSchedulingInfo());
     assertEquals(
-        CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 0, 0, 0), 
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
         (String) job1.getSchedulingInfo());
 
+    jConf = new JobConf(conf);
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job2.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
+        (String) job2.getSchedulingInfo());
+
+    jConf = new JobConf(conf);
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+    scheduler.updateQSIInfoForTests();
+    LOG.info(job3.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
+        (String) job3.getSchedulingInfo());
+
     LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
         + "2 map tasks");
     jConf.setMemoryForMapTask(2 * 1024);
@@ -2805,7 +2827,7 @@ public class TestCapacityScheduler exten
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job4= submitJobAndInit(JobStatus.PREP, jConf);
 
     LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
@@ -2816,50 +2838,56 @@ public class TestCapacityScheduler exten
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
     jConf.setUser("u1");
-    FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+    FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // Job2, a high memory job cannot be accommodated on a any TT. But with each
+    // Job4, a high memory job cannot be accommodated on a any TT. But with each
     // trip to the scheduler, each of the TT should be reserved by job2.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
     scheduler.updateQSIInfoForTests();
-    LOG.info(job2.getSchedulingInfo());
+    LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 2, 0, 0, 0), 
-        (String) job2.getSchedulingInfo());
+        (String) job4.getSchedulingInfo());
 
-    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
     scheduler.updateQSIInfoForTests();
-    LOG.info(job2.getSchedulingInfo());
+    LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
-        (String) job2.getSchedulingInfo());
+        (String) job4.getSchedulingInfo());
 
-    // Job2 has only 2 pending tasks. So no more reservations. Job3 should get
+    // Job4 has only 2 pending tasks. So no more reservations. Job5 should get
     // slots on tt3. tt1 and tt2 should not be assigned any slots with the
     // reservation stats intact.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
     scheduler.updateQSIInfoForTests();
-    LOG.info(job2.getSchedulingInfo());
+    LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
-        (String) job2.getSchedulingInfo());
+        (String) job4.getSchedulingInfo());
 
-    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
     scheduler.updateQSIInfoForTests();
-    LOG.info(job2.getSchedulingInfo());
+    LOG.info(job4.getSchedulingInfo());
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
-        (String) job2.getSchedulingInfo());
+        (String) job4.getSchedulingInfo());
 
-    checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+    checkAssignments("tt3", 
+        new String[] {
+        "attempt_test_0005_m_000001_0 on tt3"});
     scheduler.updateQSIInfoForTests();
-    LOG.info(job2.getSchedulingInfo());
+    LOG.info(job5.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0), 
+        (String) job5.getSchedulingInfo());
+
     assertEquals(
         CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0), 
-        (String) job2.getSchedulingInfo());
+        (String) job4.getSchedulingInfo());
 
     // No more tasks there in job3 also
-    assertNull(scheduler.assignTasks(tracker("tt3")));
+    assertEquals(0, scheduler.assignTasks(tracker("tt3")).size());
 }
 
   /**
@@ -2915,67 +2943,102 @@ public class TestCapacityScheduler exten
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Map 1 of high memory job
-    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    // Map 1 of normal job
+    // Map 1 of normal job
+    // Map 1 of normal job, since comparator won't change on equals
+    // Reduce 1 of high memory job
+    checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0002_m_000001_0 on tt1",
+        "attempt_test_0002_m_000002_0 on tt1",
+        "attempt_test_0002_m_000003_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"
+        });
     checkQueuesOrder(qs, scheduler
         .getOrderedQueues(TaskType.MAP));
-
-    // Reduce 1 of high memory job
-    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     checkQueuesOrder(qs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 0, 0, 0), 
+        (String) job2.getSchedulingInfo());
 
-    // Map 1 of normal job
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 1 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-
-    // Map 2 of normal job
-    checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
-    checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(TaskType.MAP));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 1, 1, 0), 
+        (String) job2.getSchedulingInfo());
 
     // Reduce 2 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
-
-    // Now both the queues are equally served. But the comparator doesn't change
-    // the order if queues are equally served.
-
-    // Map 3 of normal job
-    checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
-    checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(TaskType.MAP));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 2, 2, 0), 
+        (String) job2.getSchedulingInfo());
 
     // Reduce 3 of normal job
-    checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 3, 3, 0), 
+        (String) job2.getSchedulingInfo());
 
     // Map 2 of high memory job
-    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
-    checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(TaskType.MAP));
-
-    // Reduce 2 of high memory job
-    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
-    checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(TaskType.REDUCE));
-
     // Map 4 of normal job
-    checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+    // Map 4 of normal job
+    // Reduce 2 of high memory job
+    checkAssignments("tt2", 
+        new String[] {
+        "attempt_test_0002_m_000004_0 on tt2",
+        "attempt_test_0002_m_000005_0 on tt2",
+        "attempt_test_0001_m_000002_0 on tt2",
+        "attempt_test_0002_m_000006_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2"
+    });
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.MAP));
+    checkQueuesOrder(qs, scheduler
+        .getOrderedQueues(TaskType.REDUCE));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 3, 3, 0), 
+        (String) job2.getSchedulingInfo());
 
     // Reduce 4 of normal job
     checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
         .getOrderedQueues(TaskType.REDUCE));
+    scheduler.updateQSIInfoForTests();
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0), 
+        (String) job1.getSchedulingInfo());
+    assertEquals(
+        CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 4, 4, 0), 
+        (String) job2.getSchedulingInfo());
   }
 
   private void checkFailedInitializedJobMovement() throws IOException {
@@ -3073,6 +3136,30 @@ public class TestCapacityScheduler exten
     return tasks.get(0);
   }
 
+  protected String getAssignedTasks(List<Task> tasks) {
+    if (tasks.size() == 0) {
+      return "<empty>";
+    }
+    StringBuffer s = new StringBuffer(tasks.get(0).toString());
+    for (int i=1; i < tasks.size(); ++i) {
+      s.append(", ");
+      s.append(tasks.get(i).toString());
+    }
+    return s.toString();
+  }
+  
+  protected List<Task> checkAssignments(String taskTrackerName,
+      String[] expectedTasks) throws IOException {
+    List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+    assertNotNull(getAssignedTasks(tasks), tasks);
+    assertEquals(getAssignedTasks(tasks), expectedTasks.length, tasks.size());
+    for (int i=0; i < tasks.size(); ++i) {
+      assertEquals(getAssignedTasks(tasks) + " -> " + expectedTasks[i], 
+                   expectedTasks[i], tasks.get(i).toString());
+    }
+    return tasks;
+  }
+
   /**
    * Get the amount of memory that is reserved for tasks on the taskTracker and
    * verify that it matches what is expected.
@@ -3082,15 +3169,16 @@ public class TestCapacityScheduler exten
    * @param expectedMemForReducesOnTT
    */
   private void checkMemReservedForTasksOnTT(String taskTracker,
-      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+      Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT,
+      int numAvailableMapSlots, int numAvailableReduceSlots) {
     Long observedMemForMapsOnTT =
       scheduler.memoryMatcher.getMemReservedForTasks(
         tracker(taskTracker).getStatus(),
-            TaskType.MAP);
+            TaskType.MAP, numAvailableMapSlots);
     Long observedMemForReducesOnTT =
       scheduler.memoryMatcher.getMemReservedForTasks(
         tracker(taskTracker).getStatus(),
-            TaskType.REDUCE);
+            TaskType.REDUCE, numAvailableReduceSlots);
     if (expectedMemForMapsOnTT == null) {
       assertEquals(observedMemForMapsOnTT, null);
     } else {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:26:09 2011
@@ -32,6 +32,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.TreeMap;
+import java.util.TreeSet;
 import java.util.Vector;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -168,6 +169,19 @@ public class JobInProgress {
   private final int anyCacheLevel;
   
   /**
+   * Number of scheduling opportunities (heartbeats) given to this Job
+   */
+  private volatile long numSchedulingOpportunities;
+  
+  static String LOCALITY_WAIT_FACTOR = "mapreduce.job.locality.wait.factor";
+  static final float DEFAULT_LOCALITY_WAIT_FACTOR = 1.0f;
+  
+  /**
+   * Percentage of the cluster the job is willing to wait to get better locality
+   */
+  private float localityWaitFactor = 1.0f;
+  
+  /**
    * A special value indicating that 
    * {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
    * schedule any only off-switch and speculative map tasks for this job.
@@ -469,6 +483,7 @@ public class JobInProgress {
     Map<Node, List<TaskInProgress>> cache = 
       new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
     
+    Set<String> uniqueHosts = new TreeSet<String>();
     for (int i = 0; i < splits.length; i++) {
       String[] splitLocations = splits[i].getLocations();
       if (splitLocations == null || splitLocations.length == 0) {
@@ -478,6 +493,7 @@ public class JobInProgress {
 
       for(String host: splitLocations) {
         Node node = jobtracker.resolveAndAddToTopology(host);
+        uniqueHosts.add(host);
         LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
         for (int j = 0; j < maxLevel; j++) {
           List<TaskInProgress> hostMaps = cache.get(node);
@@ -498,6 +514,16 @@ public class JobInProgress {
         }
       }
     }
+    
+    // Calibrate the localityWaitFactor - Do not override user intent!
+    if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) {
+      float jobNodes = uniqueHosts.size();
+      float clusterNodes = jobtracker.getNumberOfUniqueHosts();
+      
+      localityWaitFactor = Math.min(jobNodes/clusterNodes, localityWaitFactor);
+      LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor);
+    }
+    
     return cache;
   }
   
@@ -640,6 +666,10 @@ public class JobInProgress {
     }
     LOG.info("Input size for job " + jobId + " = " + inputLength
         + ". Number of splits = " + splits.length);
+
+    // Set localityWaitFactor before creating cache
+    localityWaitFactor = 
+      conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
     if (numMapTasks > 0) { 
       nonRunningMapCache = createCache(splits, maxLevel);
     }
@@ -1194,6 +1224,8 @@ public class JobInProgress {
                                            ) throws IOException {
     if (status.getRunState() != JobStatus.RUNNING) {
       LOG.info("Cannot create task split for " + profile.getJobID());
+      try { throw new IOException("state = " + status.getRunState()); }
+      catch (IOException ioe) {ioe.printStackTrace();}
       return null;
     }
         
@@ -1206,6 +1238,7 @@ public class JobInProgress {
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+      resetSchedulingOpportunities();
     }
 
     return result;
@@ -1255,6 +1288,8 @@ public class JobInProgress {
   throws IOException {
     if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
+      try { throw new IOException("state = " + status.getRunState()); }
+      catch (IOException ioe) {ioe.printStackTrace();}
       return null;
     }
 
@@ -1267,6 +1302,7 @@ public class JobInProgress {
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+      resetSchedulingOpportunities();
     }
 
     return result;
@@ -1278,6 +1314,8 @@ public class JobInProgress {
   throws IOException {
     if (!tasksInited.get()) {
       LOG.info("Cannot create task split for " + profile.getJobID());
+      try { throw new IOException("state = " + status.getRunState()); }
+      catch (IOException ioe) {ioe.printStackTrace();}
       return null;
     }
 
@@ -1290,11 +1328,47 @@ public class JobInProgress {
     Task result = maps[target].getTaskToRun(tts.getTrackerName());
     if (result != null) {
       addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+      // DO NOT reset for off-switch!
     }
 
     return result;
   }
   
+  public void schedulingOpportunity() {
+    ++numSchedulingOpportunities;
+  }
+  
+  public void resetSchedulingOpportunities() {
+    numSchedulingOpportunities = 0;
+  }
+  
+  public long getNumSchedulingOpportunities() {
+    return numSchedulingOpportunities;
+  }
+
+  private static final long OVERRIDE = 1000000;
+  public void overrideSchedulingOpportunities() {
+    numSchedulingOpportunities = OVERRIDE;
+  }
+  
+  /**
+   * Check if we can schedule an off-switch task for this job.
+   * @param numTaskTrackers.
+   * 
+   * We check the number of missed opportunities for the job. 
+   * If it has 'waited' long enough we go ahead and schedule.
+   * 
+   * @return <code>true</code> if we can schedule off-switch, 
+   *         <code>false</code> otherwise
+   */
+  public boolean scheduleOffSwitch(int numTaskTrackers) {
+    long missedTaskTrackers = getNumSchedulingOpportunities();
+    long requiredSlots = 
+      Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);
+    
+    return (requiredSlots  * localityWaitFactor) < missedTaskTrackers;
+  }
+  
   /**
    * Return a CleanupTask, if appropriate, to run on the given tasktracker
    *