You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:29:56 UTC

svn commit: r1077008 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src: java/org/apache/hadoop/mapred/CapacityTaskScheduler.java test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: omalley
Date: Fri Mar  4 03:29:56 2011
New Revision: 1077008

URL: http://svn.apache.org/viewvc?rev=1077008&view=rev
Log:
commit f5aa877d4e40502138ff8f0687c239f4cc695462
Author: Arun C Murthy <ac...@apache.org>
Date:   Mon Sep 28 13:59:32 2009 -0700

    MAPREDUCE-1030. Fix capacity-scheduler to assign a map and a reduce task per-heartbeat. Contributed by Rahuk K Singh.
    
    from: http://issues.apache.org/jira/secure/attachment/12420549/MAPREDUCE-1030-2.patch.txt
    
    +++ b/YAHOO-CHANGES.txt
    +59. MAPREDUCE-1030. Fix capacity-scheduler to assign a map and a reduce task
    +    per-heartbeat. Contributed by Rahuk K Singh.
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077008&r1=1077007&r2=1077008&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 03:29:56 2011
@@ -57,7 +57,8 @@ import org.apache.hadoop.mapreduce.serve
  *  
  */
 class CapacityTaskScheduler extends TaskScheduler {
-  
+
+
   /***********************************************************************
    * Keeping track of scheduling information for queues
    * 
@@ -914,6 +915,7 @@ class CapacityTaskScheduler extends Task
   private long memSizeForReduceSlotOnJT;
   private long limitMaxMemForMapTasks;
   private long limitMaxMemForReduceTasks;
+  private boolean assignMultipleTasks = true;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -1240,12 +1242,29 @@ class CapacityTaskScheduler extends Task
     prevReduceClusterCapacity = reduceClusterCapacity;
   }
 
+  /**
+   * Sets whether the scheduler can assign multiple tasks in a heartbeat
+   * or not.
+   * 
+   * This method is used only for testing purposes.
+   * 
+   * @param assignMultipleTasks true, to assign multiple tasks per heartbeat
+   */
+  void setAssignMultipleTasks(boolean assignMultipleTasks) {
+    this.assignMultipleTasks = assignMultipleTasks;
+  }
+
   /*
-   * The grand plan for assigning a task. 
-   * First, decide whether a Map or Reduce task should be given to a TT 
+   * The grand plan for assigning a task.
+   * 
+   * If multiple task assignment is enabled, it tries to get one map and
+   * one reduce slot depending on free slots on the TT.
+   * 
+   * Otherwise, we decide whether a Map or Reduce task should be given to a TT 
    * (if the TT can accept either). 
-   * Next, pick a queue. We only look at queues that need a slot. Among these,
-   * we first look at queues whose (# of running tasks)/capacity is the least.
+   * Either way, we first pick a queue. We only look at queues that need 
+   * a slot. Among these, we first look at queues whose 
+   * (# of running tasks)/capacity is the least.
    * Next, pick a job in a queue. we pick the job at the front of the queue
    * unless its user is over the user limit. 
    * Finally, given a job, pick a task from the job. 
@@ -1253,17 +1272,8 @@ class CapacityTaskScheduler extends Task
    */
   @Override
   public synchronized List<Task> assignTasks(TaskTracker taskTracker)
-  throws IOException {
-    
-    TaskLookupResult tlr;
+  throws IOException {    
     TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
-    
-    /* 
-     * If TT has Map and Reduce slot free, we need to figure out whether to
-     * give it a Map or Reduce task.
-     * Number of ways to do this. For now, base decision on how much is needed
-     * versus how much is used (default to Map, if equal).
-     */
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
@@ -1285,60 +1295,64 @@ class CapacityTaskScheduler extends Task
      * becomes expensive, do it once every few heartbeats only.
      */ 
     updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
-    // make sure we get our map or reduce scheduling object to update its 
-    // collection of QSI objects too. 
+    List<Task> result = new ArrayList<Task>();
+    if (assignMultipleTasks) {
+      addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+      addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+    } else {
+      /* 
+       * If TT has Map and Reduce slot free, we need to figure out whether to
+       * give it a Map or Reduce task.
+       * Number of ways to do this. For now, base decision on how much is needed
+       * versus how much is used (default to Map, if equal).
+       */
+      if ((maxReduceSlots - currentReduceSlots) 
+          > (maxMapSlots - currentMapSlots)) {
+        addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+        if (result.size() == 0) {
+          addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+        }
+      } else {
+        addMapTask(taskTracker, result, maxMapSlots, currentMapSlots);
+        if (result.size() == 0) {
+          addReduceTask(taskTracker, result, maxReduceSlots, currentReduceSlots);
+        }
+      }
+      if (result.size() == 0) {
+        return null;
+      }
+    }
+    return result;
+  }
 
-    if ((maxReduceSlots - currentReduceSlots) > 
-    (maxMapSlots - currentMapSlots)) {
-      // get a reduce task first
+  // Pick a reduce task and add to the list of tasks, if there's space
+  // on the TT to run one.
+  private void addReduceTask(TaskTracker taskTracker, List<Task> tasks,
+                                  int maxReduceSlots, int currentReduceSlots) 
+                    throws IOException {
+    if (maxReduceSlots > currentReduceSlots) {
       reduceScheduler.updateCollectionOfQSIs();
-      tlr = reduceScheduler.assignTasks(taskTracker);
-      if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-        tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
-      }
-      // if we didn't get any, look at map tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                  == tlr.getLookUpStatus() ||
-                TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                  == tlr.getLookUpStatus())
-          && (maxMapSlots > currentMapSlots)) {
-        mapScheduler.updateCollectionOfQSIs();
-        tlr = mapScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
+      TaskLookupResult tlr = reduceScheduler.assignTasks(taskTracker);
+      if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+        tasks.add(tlr.getTask());
       }
     }
-    else {
-      // get a map task first
+  }
+  
+  // Pick a map task and add to the list of tasks, if there's space
+  // on the TT to run one.
+  private void addMapTask(TaskTracker taskTracker, List<Task> tasks, 
+                              int maxMapSlots, int currentMapSlots)
+                    throws IOException {
+    if (maxMapSlots > currentMapSlots) {
       mapScheduler.updateCollectionOfQSIs();
-      tlr = mapScheduler.assignTasks(taskTracker);
-      if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-        tlr.getLookUpStatus()) {
-        // found a task; return
-        return Collections.singletonList(tlr.getTask());
-      }
-      // if we didn't get any, look at reduce tasks, if TT has space
-      else if ((TaskLookupResult.LookUpStatus.TASK_FAILING_MEMORY_REQUIREMENT
-                                    == tlr.getLookUpStatus()
-                || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
-                                    == tlr.getLookUpStatus())
-          && (maxReduceSlots > currentReduceSlots)) {
-        reduceScheduler.updateCollectionOfQSIs();
-        tlr = reduceScheduler.assignTasks(taskTracker);
-        if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
-          tlr.getLookUpStatus()) {
-          return Collections.singletonList(tlr.getTask());
-        }
+      TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker);
+      if (TaskLookupResult.LookUpStatus.TASK_FOUND == tlr.getLookUpStatus()) {
+        tasks.add(tlr.getTask());
       }
     }
-
-    return null;
   }
-
+  
   // called when a job is added
   synchronized void jobAdded(JobInProgress job) throws IOException {
     QueueSchedulingInfo qsi = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077008&r1=1077007&r2=1077008&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:29:56 2011
@@ -738,6 +738,7 @@ public class TestCapacityScheduler exten
             numReduceTasksPerTracker);
     clock = new FakeClock();
     scheduler = new CapacityTaskScheduler(clock);
+    scheduler.setAssignMultipleTasks(false);
     scheduler.setTaskTrackerManager(taskTrackerManager);
 
     conf = new JobConf();
@@ -1055,7 +1056,171 @@ public class TestCapacityScheduler exten
     // complete task
     taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
   }
-  
+
+  /**
+   * Tests whether a map and reduce task are assigned when there's
+   * a single queue and multiple task assignment is enabled.
+   * @throws Exception
+   */
+  public void testMultiTaskAssignmentInSingleQueue() throws Exception {
+    try {
+      setUp(1, 6, 2);
+      // set up some queues
+      String[] qs = {"default"};
+      taskTrackerManager.addQueues(qs);
+      ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+      queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+      resConf.setFakeQueues(queues);
+      scheduler.setResourceManagerConf(resConf);
+      scheduler.start();
+      scheduler.setAssignMultipleTasks(true);
+
+      //Submit the job with 6 maps and 2 reduces
+      FakeJobInProgress j1 = submitJobAndInit(
+        JobStatus.PREP, 6, 2, "default", "u1");
+
+      List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 2);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+
+      for (Task task : tasks) {
+        if (task.toString().equals("attempt_test_0001_m_000001_0 on tt1")) {
+          //Now finish the task
+          taskTrackerManager.finishTask(
+            "tt1", task.getTaskID().toString(),
+            j1);
+        }
+      }
+
+      tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 2);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_r_000002_0 on tt1");
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+
+      //now both the reduce slots are being used , hence we should not 
+      // get only 1 map task in this assignTasks call.
+      tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 1);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_m_000003_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          fail("should not give reduce task " + task.toString());
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+    } finally {
+      scheduler.setAssignMultipleTasks(false);
+    }
+  }
+
+  public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
+    try {
+      setUp(1, 6, 2);
+      // set up some queues
+      String[] qs = {"default","q1"};
+      taskTrackerManager.addQueues(qs);
+      ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+      queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+      queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+      resConf.setFakeQueues(queues);
+      scheduler.setResourceManagerConf(resConf);
+      scheduler.start();
+      scheduler.setAssignMultipleTasks(true);
+
+      //Submit the job with 6 maps and 2 reduces
+      submitJobAndInit(
+        JobStatus.PREP, 6, 1, "default", "u1");
+
+      FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP,2,1,"q1","u2");
+
+      List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 2);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+      
+      // next assignment will be for job in second queue.
+      tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 2);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0002_m_000001_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0002_r_000001_0 on tt1");
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+
+      //now both the reduce slots are being used , hence we sholdnot get only 1
+      //map task in this assignTasks call.
+      tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 1);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          // we get from job 2 because the queues are equal in capacity usage
+          // and sorting leaves order unchanged.
+          assertEquals(task.toString(), "attempt_test_0002_m_000002_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          fail("should not give reduce task " + task.toString());
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+
+      tasks = scheduler.assignTasks(tracker("tt1"));
+      assertEquals(tasks.size(), 1);
+      for (Task task : tasks) {
+        if (task.toString().contains("_m_")) {
+          LOG.info(" map task assigned " + task.toString());
+          assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
+        } else if (task.toString().contains("_r_")) {
+          LOG.info(" reduce task assigned " + task.toString());
+          fail("should not give reduce task " + task.toString());
+        } else {
+          fail(" should not have come here " + task.toString());
+        }
+      }
+    } finally {
+      scheduler.setAssignMultipleTasks(false);
+    }
+  }
+
   // basic tests, should be able to submit to queues
   public void testSubmitToQueues() throws Exception {
     // set up some queues
@@ -1947,14 +2112,24 @@ public class TestCapacityScheduler exten
   }
 
   /**
-   * Test blocking of cluster for lack of memory.
+   * Tests that scheduler schedules normal jobs once high RAM jobs 
+   * have been reserved to the limit.
+   * 
+   * The test causes the scheduler to schedule a normal job on two
+   * trackers, and one task of the high RAM job on a third. Then it 
+   * asserts that one of the first two trackers gets a reservation 
+   * for the remaining task of the high RAM job. After this, it 
+   * asserts that a normal job submitted later is allowed to run 
+   * on a free slot, as all tasks of the high RAM job are either
+   * scheduled or reserved.
+   *  
    * @throws IOException
    */
   public void testClusterBlockingForLackOfMemory()
       throws IOException {
 
     LOG.debug("Starting the scheduler.");
-    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+    taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
 
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -1977,34 +2152,38 @@ public class TestCapacityScheduler exten
     scheduler.start();
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 1 reduce tasks.");
+        + "2 map, 2 reduce tasks.");
     JobConf jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // Fill the second tt with this job.
-    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    // Fill a tt with this job's tasks.
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         1, 1, 0, 0, 0, 0),
         (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-    checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
-    // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
-        25.0f);
+    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 0L);
+
+    // same for reduces.
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         1, 1, 0, 1, 1, 0),
         (String) job1.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+
+    // fill another TT with the rest of the tasks of the job
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
 
     LOG.debug("Submit one high memory(2GB maps/reduces) job of "
         + "2 map, 2 reduce tasks.");
@@ -2017,27 +2196,26 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
+    // Have another TT run one task of each type of the high RAM
+    // job. This will fill up the TT. 
+    checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         1, 2, 0, 0, 0, 0),
         (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 0L);
 
-    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    // Total 3 reduce slots should be accounted for.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
-        75.0f);
+    checkAssignment("tt3", "attempt_test_0002_r_000001_0 on tt3");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         1, 2, 0, 1, 2, 0),
         (String) job2.getSchedulingInfo());
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+    checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
 
     LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
-        + "1 map, 0 reduce tasks.");
+        + "1 map, 1 reduce tasks.");
     jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(1 * 1024);
@@ -2047,29 +2225,27 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
 
-    // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
-    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Send a TT with insufficient space for task assignment,
+    // This will cause a reservation for the high RAM job.
     assertNull(scheduler.assignTasks(tracker("tt1")));
 
-    // reserved tasktrackers contribute to occupied slots for maps.
-    checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
-    // occupied slots for reduces remain unchanged as tt1 is not reserved for
-    // reduces.
-    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    // reserved tasktrackers contribute to occupied slots for maps and reduces
+    checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
     LOG.info(job2.getSchedulingInfo());
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
-        1, 2, 2, 1, 2, 0),
+        1, 2, 2, 1, 2, 2),
         (String) job2.getSchedulingInfo());
     assertEquals(String.format(
         CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
         0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
-
-    // One reservation is already done for job2. So job3 should go ahead.
+    
+    // Reservations are already done for job2. So job3 should go ahead.
     checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
   }
 
   /**