You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/19 17:22:09 UTC

svn commit: r776352 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: yhemanth
Date: Tue May 19 15:21:53 2009
New Revision: 776352

URL: http://svn.apache.org/viewvc?rev=776352&view=rev
Log:
HADOOP-4981. Fix capacity scheduler to schedule speculative tasks correctly in the presence of High RAM jobs. Contributed by Sreekanth Ramakrishnan.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue May 19 15:21:53 2009
@@ -690,6 +690,10 @@
     HADOOP-5828. Use absolute path for mapred.local.dir of JobTracker in
     MiniMRCluster. (yhemanth)
 
+    HADOOP-4981. Fix capacity scheduler to schedule speculative tasks 
+    correctly in the presence of High RAM jobs.
+    (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.20.0 - 2009-04-15
 
   INCOMPATIBLE CHANGES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 19 15:21:53 2009
@@ -288,6 +288,15 @@
         JobInProgress job) throws IOException; 
     abstract int getPendingTasks(JobInProgress job);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+    /**
+     * To check if job has a speculative task on the particular tracker.
+     * 
+     * @param job job to check for speculative tasks.
+     * @param tts task tracker on which speculative task would run.
+     * @return true if there is a speculative task to run on the tracker.
+     */
+    abstract boolean hasSpeculativeTask(JobInProgress job, 
+        TaskTrackerStatus tts);
 
     /**
      * List of QSIs for assigning tasks.
@@ -400,29 +409,32 @@
         // check if the job's user is over limit
         if (isUserOverLimit(j.getProfile().getUser(), qsi)) {
           continue;
-        }
-        if (getPendingTasks(j) != 0) {
-          // Not accurate TODO:
-          // check if the job's memory requirements are met
-          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
-            // We found a suitable job. Get task from it.
-            Task t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              // we're successful in getting a task
-              return TaskLookupResult.getTaskFoundResult(t);
-            }
+        } 
+        //If this job meets memory requirements. Ask the JobInProgress for
+        //a task to be scheduled on the task tracker.
+        //if we find a job then we pass it on.
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          //if there is a task return it immediately.
+          if (t != null) {
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } else {
+            //skip to the next job in the queue.
+            continue;
           }
-          else {
-            // mem requirements not met or could not be computed for this TT
-            // Rather than look at the next job, 
-            // we return nothing to the TT, with the hope that we improve 
-            // chances of finding a suitable TT for this job. This lets us
-            // avoid starving jobs with high mem requirements.         
+        } else {
+          //if memory requirements don't match then we check if the 
+          //job has either pending or speculative task. If the job
+          //has pending or speculative task we block till this job
+          //tasks get scheduled. So that high memory jobs are not starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
             return TaskLookupResult.getMemFailedResult();
-          }
-        }
+          } 
+        }//end of memory check block
         // if we're here, this job has no task to run. Look at the next job.
-      }
+      }//end of for loop
 
       // if we're here, we haven't found any task to run among all jobs in 
       // the queue. This could be because there is nothing to run, or that 
@@ -444,24 +456,28 @@
         if (j.getStatus().getRunState() != JobStatus.RUNNING) {
           continue;
         }
-        if (getPendingTasks(j) != 0) {
-          // Not accurate TODO:
-          // check if the job's memory requirements are met
-          if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
-            // We found a suitable job. Get task from it.
-            Task t = obtainNewTask(taskTracker, j);
-            if (t != null) {
-              // we're successful in getting a task
-              return TaskLookupResult.getTaskFoundResult(t);
-            }
+        if (scheduler.memoryMatcher.matchesMemoryRequirements(j, taskTracker)) {
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTracker, j);
+          //if there is a task return it immediately.
+          if (t != null) {
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } else {
+            //skip to the next job in the queue.
+            continue;
           }
-          else {
-            // mem requirements not met. 
+        } else {
+          //if memory requirements don't match then we check if the 
+          //job has either pending or speculative task. If the job
+          //has pending or speculative task we block till this job
+          //tasks get scheduled, so that high memory jobs are not 
+          //starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
             return TaskLookupResult.getMemFailedResult();
-          }
-        }
-        // if we're here, this job has no task to run. Look at the next job.
-      }
+          } 
+        }//end of memory check block
+      }//end of for loop
 
       // found nothing for this queue, look at the next one.
       String msg = "Found no task from the queue " + qsi.queueName;
@@ -516,6 +532,27 @@
       LOG.debug(s);
     }
     
+    /**
+     * Check if one of the tasks have a speculative task to execute on the 
+     * particular task tracker.
+     * 
+     * @param tips tasks of a job
+     * @param progress percentage progress of the job
+     * @param tts task tracker status for which we are asking speculative tip
+     * @return true if job has a speculative task to run on particular TT.
+     */
+    boolean hasSpeculativeTask(TaskInProgress[] tips, float progress, 
+        TaskTrackerStatus tts) {
+      long currentTime = System.currentTimeMillis();
+      for(TaskInProgress tip : tips)  {
+        if(tip.isRunning() 
+            && !(tip.hasRunOnMachine(tts.getHost(), tts.getTrackerName())) 
+            && tip.hasSpeculativeTask(currentTime, progress)) {
+          return true;
+        }
+      }
+      return false;
+    }
   }
 
   /**
@@ -549,6 +586,15 @@
       return qsi.mapTSI;
     }
 
+    @Override
+    boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+      //Check if job supports speculative map execution first then 
+      //check if job has speculative maps.
+      return (job.getJobConf().getMapSpeculativeExecution())&& (
+          hasSpeculativeTask(job.getMapTasks(), 
+              job.getStatus().mapProgress(), tts));
+    }
+
   }
 
   /**
@@ -581,6 +627,16 @@
     TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
       return qsi.reduceTSI;
     }
+
+    @Override
+    boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
+      //check if the job supports reduce speculative execution first then
+      //check if the job has speculative tasks.
+      return (job.getJobConf().getReduceSpeculativeExecution()) && (
+          hasSpeculativeTask(job.getReduceTasks(), 
+              job.getStatus().reduceProgress(), tts));
+    }
+
   }
   
   /** the scheduling mgrs for Map and Reduce tasks */ 

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=776352&r1=776351&r2=776352&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 19 15:21:53 2009
@@ -40,6 +40,7 @@
 import org.apache.hadoop.conf.Configuration;
 
 
+
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
@@ -145,17 +146,24 @@
   }
 
   private ControlledInitializationPoller controlledInitializationPoller;
-
+  /*
+   * Fake job in progress object used for testing the schedulers scheduling
+   * decisions. The JobInProgress objects returns out FakeTaskInProgress
+   * objects when assignTasks is called. If speculative maps and reduces
+   * are configured then JobInProgress returns exactly one Speculative
+   * map and reduce task.
+   */
   static class FakeJobInProgress extends JobInProgress {
     
-    private FakeTaskTrackerManager taskTrackerManager;
+    protected FakeTaskTrackerManager taskTrackerManager;
     private int mapTaskCtr;
     private int redTaskCtr;
     private Set<TaskInProgress> mapTips = 
       new HashSet<TaskInProgress>();
     private Set<TaskInProgress> reduceTips = 
       new HashSet<TaskInProgress>();
-    
+    private int speculativeMapTaskCounter = 0;
+    private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user) {
       super(jId, jobConf);
@@ -186,8 +194,14 @@
     @Override
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
-      if (mapTaskCtr == numMapTasks) return null;
-      TaskAttemptID attemptId = getTaskAttemptID(true);
+      boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
+      if (areAllMapsRunning){
+        if(!getJobConf().getMapSpeculativeExecution() || 
+            speculativeMapTasks > 0) {
+          return null;
+        }
+      }
+      TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
       Task task = new MapTask("", attemptId, 0, "", new BytesWritable()) {
         @Override
         public String toString() {
@@ -197,16 +211,39 @@
       taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningMapTasks++;
       // create a fake TIP and keep track of it
-      mapTips.add(new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, true, this));
+      FakeTaskInProgress mapTip = new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, true, this);
+      mapTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if(areAllMapsRunning) {
+        speculativeMapTasks++;
+        //you have scheduled a speculative map. Now set all tips in the
+        //map tips not to have speculative task.
+        for(TaskInProgress t : mapTips) {
+          if (t instanceof FakeTaskInProgress) {
+            FakeTaskInProgress mt = (FakeTaskInProgress) t;
+            mt.hasSpeculativeMap = false;
+          }
+        }
+      } else {
+        //add only non-speculative tips.
+        mapTips.add(mapTip);
+        //add the tips to the JobInProgress TIPS
+        maps = mapTips.toArray(new TaskInProgress[mapTips.size()]);
+      }
       return task;
     }
     
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
-      if (redTaskCtr == numReduceTasks) return null;
-      TaskAttemptID attemptId = getTaskAttemptID(false);
+      boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
+      if (areAllReducesRunning){
+        if(!getJobConf().getReduceSpeculativeExecution() || 
+            speculativeReduceTasks > 0) {
+          return null;
+        }
+      }
+      TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
       Task task = new ReduceTask("", attemptId, 0, 10) {
         @Override
         public String toString() {
@@ -216,8 +253,25 @@
       taskTrackerManager.startTask(tts.getTrackerName(), task);
       runningReduceTasks++;
       // create a fake TIP and keep track of it
-      reduceTips.add(new FakeTaskInProgress(getJobID(), 
-          getJobConf(), task, false, this));
+      FakeTaskInProgress reduceTip = new FakeTaskInProgress(getJobID(), 
+          getJobConf(), task, false, this);
+      reduceTip.taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if(areAllReducesRunning) {
+        speculativeReduceTasks++;
+        //you have scheduled a speculative map. Now set all tips in the
+        //map tips not to have speculative task.
+        for(TaskInProgress t : reduceTips) {
+          if (t instanceof FakeTaskInProgress) {
+            FakeTaskInProgress rt = (FakeTaskInProgress) t;
+            rt.hasSpeculativeReduce = false;
+          }
+        }
+      } else {
+        //add only non-speculative tips.
+        reduceTips.add(reduceTip);
+        //add the tips to the JobInProgress TIPS
+        reduces = reduceTips.toArray(new TaskInProgress[reduceTips.size()]);
+      }
       return task;
     }
     
@@ -231,14 +285,19 @@
       finishedReduceTasks++;
     }
     
-    private TaskAttemptID getTaskAttemptID(boolean isMap) {
+    private TaskAttemptID getTaskAttemptID(boolean isMap, boolean isSpeculative) {
       JobID jobId = getJobID();
       TaskType t = TaskType.REDUCE;
       if (isMap) {
         t = TaskType.MAP;
       }
-      return new TaskAttemptID(jobId.getJtIdentifier(),
-          jobId.getId(), t, (isMap)?++mapTaskCtr: ++redTaskCtr, 0);
+      if (!isSpeculative) {
+        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+            (isMap) ? ++mapTaskCtr : ++redTaskCtr, 0);
+      } else  {
+        return new TaskAttemptID(jobId.getJtIdentifier(), jobId.getId(), t,
+            (isMap) ? mapTaskCtr : redTaskCtr, 1);
+      }
     }
     
     @Override
@@ -269,12 +328,15 @@
       this.status.setRunState(JobStatus.FAILED);
     }
   }
-  
+ 
   static class FakeTaskInProgress extends TaskInProgress {
     private boolean isMap;
     private FakeJobInProgress fakeJob;
     private TreeMap<TaskAttemptID, String> activeTasks;
     private TaskStatus taskStatus;
+    boolean hasSpeculativeMap;
+    boolean hasSpeculativeReduce;
+    
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
       super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
@@ -286,6 +348,16 @@
       this.taskStatus = TaskStatus.createTaskStatus(isMap);
       taskStatus.setProgress(0.5f);
       taskStatus.setRunState(TaskStatus.State.RUNNING);
+      if (jobConf.getMapSpeculativeExecution()) {
+        //resetting of the hasSpeculativeMap is done
+        //when speculative map is scheduled by the job.
+        hasSpeculativeMap = true;
+      } 
+      if (jobConf.getReduceSpeculativeExecution()) {
+        //resetting of the hasSpeculativeReduce is done
+        //when speculative reduce is scheduled by the job.
+        hasSpeculativeReduce = true;
+      }
     }
     
     @Override
@@ -307,6 +379,27 @@
       }
       return true;
     }
+    
+    @Override
+    /*
+     *hasSpeculativeMap and hasSpeculativeReduce is reset by FakeJobInProgress
+     *after the speculative tip has been scheduled.
+     */
+    boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+      if(isMap && hasSpeculativeMap) {
+        return fakeJob.getJobConf().getMapSpeculativeExecution();
+      } 
+      if (!isMap && hasSpeculativeReduce) {
+        return fakeJob.getJobConf().getReduceSpeculativeExecution();
+      }
+      return false;
+    }
+    
+    @Override
+    public boolean isRunning() {
+      return !activeTasks.isEmpty();
+    }
+    
   }
   
   static class FakeQueueManager extends QueueManager {
@@ -625,6 +718,9 @@
         resConf.getQueues());
     scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
+    //by default disable speculative execution.
+    conf.setMapSpeculativeExecution(false);
+    conf.setReduceSpeculativeExecution(false);
   }
   
   @Override
@@ -1533,6 +1629,8 @@
     jConf.setNumReduceTasks(1);
     jConf.setQueueName("default");
     jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
     FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
 
@@ -1551,6 +1649,8 @@
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
     jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
     submitJobAndInit(JobStatus.PREP, jConf); // job2
 
     // This job shouldn't run the TT now because of lack of pmem
@@ -2147,6 +2247,269 @@
     assertFalse("Waiting job contains submitted job", 
         mgr.getRunningJobQueue("default").contains(job));
   }
+  
+  /**
+   * Test case deals with normal jobs which have speculative maps and reduce.
+   * Following is test executed
+   * <ol>
+   * <li>Submit one job with speculative maps and reduce.</li>
+   * <li>Submit another job with no speculative execution.</li>
+   * <li>Observe that all tasks from first job get scheduled, speculative
+   * and normal tasks</li>
+   * <li>Finish all the first jobs tasks second jobs tasks get scheduled.</li>
+   * </ol>
+   * @throws IOException
+   */
+  public void testSpeculativeTaskScheduling() throws IOException {
+    String[] qs = {"default"};
+    taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    JobQueuesManager mgr = scheduler.jobQueuesManager;
+    JobConf conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    conf.setMapSpeculativeExecution(true);
+    conf.setReduceSpeculativeExecution(true);
+    //Submit a job which would have one speculative map and one speculative
+    //reduce.
+    FakeJobInProgress fjob1 = submitJob(JobStatus.PREP, conf);
+    
+    conf = new JobConf();
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(1);
+    //Submit a job which has no speculative map or reduce.
+    FakeJobInProgress fjob2 = submitJob(JobStatus.PREP, conf);    
+
+    //Ask the poller to initalize all the submitted job and raise status
+    //change event.
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(mgr);
+
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    assertTrue("Pending maps of job1 greater than zero", 
+        (fjob1.pendingMaps() == 0));
+    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+    assertTrue("Pending reduces of job2 greater than zero", 
+        (fjob1.pendingReduces() == 0));
+    checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", fjob1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
+    taskTrackerManager.finalizeJob(fjob1);
+    
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
+    taskTrackerManager.finalizeJob(fjob2);
+    
+  }
+  /**
+   * Test case to test scheduling of
+   * <ol> 
+   * <li>High ram job with speculative map execution.
+   * <ul>
+   * <li>Submit one high ram job which has speculative map.</li>
+   * <li>Submit a normal job which has no speculative map.</li>
+   * <li>Scheduler should schedule first all map tasks from first job and block
+   * the cluster till both maps from first job get completed.
+   * </ul>
+   * </li>
+   * <li>High ram job with speculative reduce execution.
+   * <ul>
+   * <li>Submit one high ram job which has speculative reduce.</li>
+   * <li>Submit a normal job which has no speculative reduce.</li>
+   * <li>Scheduler should schedule first all reduce tasks from first job and block
+   * the cluster till both reduces are completed.</li>
+   * </ul>
+   * </li>
+   * </ol>
+   * @throws IOException
+   */
+  public void testHighRamJobWithSpeculativeExecution() throws IOException {
+    // 2 map and 2 reduce slots
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+    
+    //task tracker memory configurations.
+    TaskTrackerStatus.ResourceStatus ttStatus =
+        taskTrackerManager.getTaskTracker("tt1").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+    ttStatus = taskTrackerManager.getTaskTracker("tt2").getResourceStatus();
+    ttStatus.setTotalVirtualMemory(3 * 1024 * 1024 * 1024L);
+    ttStatus.setReservedVirtualMemory(0);
+    ttStatus.setTotalPhysicalMemory(1536 * 1024 * 1024L);
+    ttStatus.setReservedPhysicalMemory(0);
+   
+
+    taskTrackerManager.addQueues(new String[] { "default" });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    scheduler.getConf().setLong(JobConf.MAPRED_TASK_DEFAULT_MAXVMEM_PROPERTY,
+        1 * 1024 * 1024 * 1024L);
+    scheduler.getConf().setLong(JobConf.UPPER_LIMIT_ON_TASK_VMEM_PROPERTY,
+        3 * 1024 * 1024 * 1024L);
+    resConf.setDefaultPercentOfPmemInVmem(33.3f);
+    resConf.setLimitMaxPmemForTasks(1536 * 1024 * 1024L);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    
+    JobConf jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(true);
+    jConf.setReduceSpeculativeExecution(false);
+    FakeJobInProgress job1 = new FakeJobInProgress(new JobID("test", ++jobCounter),
+          jConf, taskTrackerManager,"u1");
+    
+    //Submit a high memory job with speculative tasks.
+    taskTrackerManager.submitJob(job1);
+    
+    jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
+    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
+    //Submit normal job
+    FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+
+    // first, a map from j1 will run
+    // at this point, there is a speculative task for the same job to be
+    //scheduled. This task would be scheduled. Till the tasks from job1 gets
+    //complete none of the tasks from other jobs would be scheduled.
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
+    //make same tracker get back, check if you are blocking. Your job
+    //has speculative map task so tracker should be blocked even tho' it
+    //can run job2's map.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    //TT2 now gets speculative map of the job1
+    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
+
+    // Now since the first job has no more speculative maps, it can schedule
+    // the second job.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+    //finish everything
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
+        job1);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", 
+        job1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", 
+        job2);
+    taskTrackerManager.finalizeJob(job1);
+    taskTrackerManager.finalizeJob(job2);
+    
+    //Now submit high ram job with speculative reduce and check.
+    jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(2 * 1024 * 1024 * 1024L); // 2GB vmem
+    jConf.setMaxPhysicalMemoryForTask(400 * 1024 * 1024L); // 400MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(true);
+    FakeJobInProgress job3 = new FakeJobInProgress(new JobID("test", ++jobCounter),
+          jConf, taskTrackerManager,"u1");
+    
+    //Submit a high memory job with speculative reduce tasks.
+    taskTrackerManager.submitJob(job3);
+    
+    jConf = new JobConf();
+    jConf.setMaxVirtualMemoryForTask(100 * 1024 * 1024L); // 100MB vmem
+    jConf.setMaxPhysicalMemoryForTask(50 * 1024 * 1024L); // 50MB pmem
+    jConf.setNumMapTasks(1);
+    jConf.setNumReduceTasks(1);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    jConf.setMapSpeculativeExecution(false);
+    jConf.setReduceSpeculativeExecution(false);
+    //Submit normal job
+    FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+    
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
+    //all maps of jobs get assigned to same task tracker as
+    //job does not have speculative map and same tracker sends two heart
+    //beat back to back.
+    checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+    //first map slot gets attention on this tracker.
+    checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
+    //now first reduce of the job3 would be scheduled on tt2 since it has
+    //memory.
+    //assigntasks() would check for free reduce slot is greater than
+    //map slots. Seeing there is more free reduce slot it would try scheduling
+    //reduce of job1 but would block as in it is a high memory task.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    //TT2 would get the reduce task from high memory job as the tt is running
+    //normal jobs map. which is low mem.
+    checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
+    // now if either TT comes back, it will block because all maps
+    // are done, and the first jobs reduce has a speculative task.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    assertNull(scheduler.assignTasks(tracker("tt2"))); 
+    //finish maps.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", 
+        job3);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0004_m_000001_0", 
+        job4);
+    //check speculative reduce code path is covered.
+    assertEquals("Pending reduces not zero for high " +
+    		"ram job with speculative reduce.", 0, job3.pendingReduces());
+    //if tt2 returns back it is not given any task even if it can schedule
+    //job2 reduce.
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    //speculative reduce of the job3 would be scheduled.
+    checkAssignment("tt1", "attempt_test_0003_r_000001_1 on tt1");
+    //now both speculative and actual task have been scheduled for job3.
+    //Normal task of Job4 would now be scheduled on TT1 as it has free space
+    //to run.
+    checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
+    //No more tasks.
+    assertNull(scheduler.assignTasks(tracker("tt2")));
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    
+    //finish all the reduces.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_1", 
+        job3);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0003_r_000001_0", 
+        job3);
+    //finish the job
+    taskTrackerManager.finalizeJob(job3);
+    //finish the task and the job.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0004_r_000001_0", 
+        job4);
+    taskTrackerManager.finalizeJob(job4);
+    
+  }
 
   private void checkRunningJobMovementAndCompletion() throws IOException {