You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:11 UTC

svn commit: r1076949 [1/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/java/org/apa...

Author: omalley
Date: Fri Mar  4 03:25:10 2011
New Revision: 1076949

URL: http://svn.apache.org/viewvc?rev=1076949&view=rev
Log:
commit 15c352629793f64a0fc8ab2ad24a6c081f731687
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:34 2009 -0700

    Applying patch 2836271.mr516.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/jobtracker/
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/server/jobtracker/TaskTracker.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueInformation.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobTrackerSafeMode.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestResourceEstimation.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestTTMemoryReporting.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/taskdetails.jsp

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 03:25:10 2011
@@ -32,6 +32,9 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapred.JobTracker.IllegalStateException;
+import org.apache.hadoop.mapred.TaskTrackerStatus;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that implements the requirements in HADOOP-3421
@@ -290,13 +293,14 @@ class CapacityTaskScheduler extends Task
 
     /** our TaskScheduler object */
     protected CapacityTaskScheduler scheduler;
-    protected CapacityTaskScheduler.TYPE type = null;
+    protected TaskType type = null;
 
     abstract Task obtainNewTask(TaskTrackerStatus taskTracker, 
         JobInProgress job) throws IOException;
 
     int getSlotsOccupied(JobInProgress job) {
-      return getRunningTasks(job) * getSlotsPerTask(job);
+      return (getNumReservedTaskTrackers(job) + getRunningTasks(job)) * 
+             getSlotsPerTask(job);
     }
 
     abstract int getClusterCapacity();
@@ -304,6 +308,8 @@ class CapacityTaskScheduler extends Task
     abstract int getRunningTasks(JobInProgress job);
     abstract int getPendingTasks(JobInProgress job);
     abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
+    abstract int getNumReservedTaskTrackers(JobInProgress job);
+    
     /**
      * To check if job has a speculative task on the particular tracker.
      * 
@@ -315,6 +321,18 @@ class CapacityTaskScheduler extends Task
         TaskTrackerStatus tts);
 
     /**
+     * Check if the given job has sufficient reserved tasktrackers for all its
+     * pending tasks.
+     * 
+     * @param job job to check for sufficient reserved tasktrackers 
+     * @return <code>true</code> if the job has reserved tasktrackers,
+     *         else <code>false</code>
+     */
+    boolean hasSufficientReservedTaskTrackers(JobInProgress job) {
+      return getNumReservedTaskTrackers(job) >= getPendingTasks(job);
+    }
+    
+    /**
      * List of QSIs for assigning tasks.
      * Queues are ordered by a ratio of (# of running tasks)/capacity, which
      * indicates how much 'free space' the queue has, or how much it is over
@@ -419,10 +437,10 @@ class CapacityTaskScheduler extends Task
      * It tries to get a task from jobs in a single queue. 
      * Always return a TaskLookupResult object. Don't return null. 
      */
-    private TaskLookupResult getTaskFromQueue(TaskTrackerStatus taskTracker,
-        QueueSchedulingInfo qsi)
-        throws IOException {
-
+    private TaskLookupResult getTaskFromQueue(TaskTracker taskTracker,
+                                              QueueSchedulingInfo qsi)
+    throws IOException {
+      TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
       // we only look at jobs in the running queues, as these are the ones
       // who have been potentially initialized
 
@@ -442,9 +460,9 @@ class CapacityTaskScheduler extends Task
         //a task to be scheduled on the task tracker.
         //if we find a job then we pass it on.
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTracker)) {
+                                                              taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTracker, j);
+          Task t = obtainNewTask(taskTrackerStatus, j);
           //if there is a task return it immediately.
           if (t != null) {
             // we're successful in getting a task
@@ -457,10 +475,18 @@ class CapacityTaskScheduler extends Task
           }
         } else {
           //if memory requirements don't match then we check if the 
-          //job has either pending or speculative task. If the job
-          //has pending or speculative task we block till this job
-          //tasks get scheduled. So that high memory jobs are not starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+          //job has either pending or speculative task or has insufficient number
+          //of 'reserved' tasktrackers to cover all pending tasks. If so
+          //we reserve the current tasktracker for this job so that 
+          //high memory jobs are not starved
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus) || 
+              !hasSufficientReservedTaskTrackers(j)) {
+            // Reserve all available slots on this tasktracker
+            LOG.info(j.getJobID() + ": Reserving " + taskTracker.getTrackerName() + 
+                     " since memory-requirements don't match");
+            taskTracker.reserveSlots(type, j, taskTracker.getAvailableSlots(type));
+            
+            // Block
             return TaskLookupResult.getMemFailedResult();
           } 
         }//end of memory check block
@@ -472,7 +498,9 @@ class CapacityTaskScheduler extends Task
       // the user limit for some user is too strict, i.e., there's at least 
       // one user who doesn't have enough tasks to satisfy his limit. If 
       // it's the latter case, re-look at jobs without considering user 
-      // limits, and get a task from the first eligible job
+      // limits, and get a task from the first eligible job; however
+      // we do not 'reserve' slots on tasktrackers anymore since the user is 
+      // already over the limit
       // Note: some of the code from above is repeated here. This is on 
       // purpose as it improves overall readability.  
       // Note: we walk through jobs again. Some of these jobs, which weren't
@@ -488,9 +516,9 @@ class CapacityTaskScheduler extends Task
           continue;
         }
         if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
-            taskTracker)) {
+            taskTrackerStatus)) {
           // We found a suitable job. Get task from it.
-          Task t = obtainNewTask(taskTracker, j);
+          Task t = obtainNewTask(taskTrackerStatus, j);
           //if there is a task return it immediately.
           if (t != null) {
             // we're successful in getting a task
@@ -505,7 +533,7 @@ class CapacityTaskScheduler extends Task
           //has pending or speculative task we block till this job
           //tasks get scheduled, so that high memory jobs are not 
           //starved
-          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTracker)) {
+          if (getPendingTasks(j) != 0 || hasSpeculativeTask(j, taskTrackerStatus)) {
             return TaskLookupResult.getMemFailedResult();
           } 
         }//end of memory check block
@@ -520,10 +548,52 @@ class CapacityTaskScheduler extends Task
     // Always return a TaskLookupResult object. Don't return null. 
     // The caller is responsible for ensuring that the QSI objects and the 
     // collections are up-to-date.
-    private TaskLookupResult assignTasks(TaskTrackerStatus taskTracker) throws IOException {
+    private TaskLookupResult assignTasks(TaskTracker taskTracker) 
+    throws IOException {
+      TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
 
       printQSIs();
 
+      // Check if this tasktracker has been reserved for a job...
+      JobInProgress job = taskTracker.getJobForFallowSlot(type);
+      if (job != null) {
+        int availableSlots = taskTracker.getAvailableSlots(type);
+        if (LOG.isDebugEnabled()) {
+          LOG.debug(job.getJobID() + ": Checking 'reserved' tasktracker " + 
+                    taskTracker.getTrackerName() + " with " + availableSlots + 
+                    " '" + type + "' slots");
+        }
+
+        if (availableSlots >= job.getNumSlotsPerTask(type)) {
+          // Unreserve 
+          taskTracker.unreserveSlots(type, job);
+          
+          // We found a suitable job. Get task from it.
+          Task t = obtainNewTask(taskTrackerStatus, job);
+          //if there is a task return it immediately.
+          if (t != null) {
+            if (LOG.isDebugEnabled()) {
+              LOG.info(job.getJobID() + ": Got " + t.getTaskID() + 
+                       " for reserved tasktracker " + 
+                       taskTracker.getTrackerName());
+            }
+            // we're successful in getting a task
+            return TaskLookupResult.getTaskFoundResult(t);
+          } 
+        } else {
+          // Re-reserve the current tasktracker
+          taskTracker.reserveSlots(type, job, availableSlots);
+          
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(job.getJobID() + ": Re-reserving " + 
+                      taskTracker.getTrackerName());
+          }
+
+          return TaskLookupResult.getMemFailedResult(); 
+        }
+      }
+      
+      
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
@@ -600,7 +670,7 @@ class CapacityTaskScheduler extends Task
 
     MapSchedulingMgr(CapacityTaskScheduler schedulr) {
       super(schedulr);
-      type = CapacityTaskScheduler.TYPE.MAP;
+      type = TaskType.MAP;
       queueComparator = mapComparator;
     }
 
@@ -631,9 +701,8 @@ class CapacityTaskScheduler extends Task
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      long myVmem = job.getJobConf().getMemoryForMapTask();
-      return (int) (Math.ceil((float) myVmem
-          / (float) scheduler.getMemSizeForMapSlot()));
+      return 
+        job.getJobConf().computeNumSlotsPerMap(scheduler.getMemSizeForMapSlot());    
     }
 
     @Override
@@ -641,6 +710,10 @@ class CapacityTaskScheduler extends Task
       return qsi.mapTSI;
     }
 
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForMaps();
+    }
+
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //Check if job supports speculative map execution first then 
@@ -659,7 +732,7 @@ class CapacityTaskScheduler extends Task
 
     ReduceSchedulingMgr(CapacityTaskScheduler schedulr) {
       super(schedulr);
-      type = CapacityTaskScheduler.TYPE.REDUCE;
+      type = TaskType.REDUCE;
       queueComparator = reduceComparator;
     }
 
@@ -691,9 +764,8 @@ class CapacityTaskScheduler extends Task
 
     @Override
     int getSlotsPerTask(JobInProgress job) {
-      long myVmem = job.getJobConf().getMemoryForReduceTask();
-      return (int) (Math.ceil((float) myVmem
-          / (float) scheduler.getMemSizeForReduceSlot()));
+      return
+        job.getJobConf().computeNumSlotsPerReduce(scheduler.getMemSizeForReduceSlot());    
     }
 
     @Override
@@ -701,6 +773,10 @@ class CapacityTaskScheduler extends Task
       return qsi.reduceTSI;
     }
 
+    int getNumReservedTaskTrackers(JobInProgress job) {
+      return job.getNumReservedTaskTrackersForReduces();
+    }
+
     @Override
     boolean hasSpeculativeTask(JobInProgress job, TaskTrackerStatus tts) {
       //check if the job supports reduce speculative execution first then
@@ -729,9 +805,10 @@ class CapacityTaskScheduler extends Task
   /** whether scheduler has started or not */
   private boolean started = false;
 
-  static String JOB_SCHEDULING_INFO_FORMAT_STRING =
-      "%s running map tasks using %d map slots,"
-          + " %s running reduce tasks using %d reduce slots.";
+  final static String JOB_SCHEDULING_INFO_FORMAT_STRING =
+    "%s running map tasks using %d map slots. %d additional slots reserved." +
+    " %s running reduce tasks using %d reduce slots." +
+    " %d additional slots reserved.";
   /**
    * A clock class - can be mocked out for testing.
    */
@@ -741,11 +818,6 @@ class CapacityTaskScheduler extends Task
     }
   }
 
-  // can be replaced with a global type, if we have one
-  protected static enum TYPE {
-    MAP, REDUCE
-  }
-
   private Clock clock;
   private JobInitializationPoller initializationPoller;
 
@@ -859,10 +931,10 @@ class CapacityTaskScheduler extends Task
     return limitMaxMemForReduceTasks;
   }
 
-  String[] getOrderedQueues(CapacityTaskScheduler.TYPE type) {
-    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+  String[] getOrderedQueues(TaskType type) {
+    if (type == TaskType.MAP) {
       return mapScheduler.getOrderedQueues();
-    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+    } else if (type == TaskType.REDUCE) {
       return reduceScheduler.getOrderedQueues();
     }
     return null;
@@ -1015,13 +1087,26 @@ class CapacityTaskScheduler extends Task
 
         int numMapsRunningForThisJob = mapScheduler.getRunningTasks(j);
         int numReducesRunningForThisJob = reduceScheduler.getRunningTasks(j);
+        int numRunningMapSlots = 
+          numMapsRunningForThisJob * mapScheduler.getSlotsPerTask(j);
+        int numRunningReduceSlots =
+          numReducesRunningForThisJob * reduceScheduler.getSlotsPerTask(j);
         int numMapSlotsForThisJob = mapScheduler.getSlotsOccupied(j);
         int numReduceSlotsForThisJob = reduceScheduler.getSlotsOccupied(j);
-        j.setSchedulingInfo(String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
-            Integer.valueOf(numMapsRunningForThisJob), Integer
-                .valueOf(numMapSlotsForThisJob), Integer
-                .valueOf(numReducesRunningForThisJob), Integer
-                .valueOf(numReduceSlotsForThisJob)));
+        int numReservedMapSlotsForThisJob = 
+          (mapScheduler.getNumReservedTaskTrackers(j) * 
+           mapScheduler.getSlotsPerTask(j)); 
+        int numReservedReduceSlotsForThisJob = 
+          (reduceScheduler.getNumReservedTaskTrackers(j) * 
+           reduceScheduler.getSlotsPerTask(j)); 
+        j.setSchedulingInfo(
+            String.format(JOB_SCHEDULING_INFO_FORMAT_STRING,
+                          Integer.valueOf(numMapsRunningForThisJob), 
+                          Integer.valueOf(numRunningMapSlots),
+                          Integer.valueOf(numReservedMapSlotsForThisJob),
+                          Integer.valueOf(numReducesRunningForThisJob), 
+                          Integer.valueOf(numRunningReduceSlots),
+                          Integer.valueOf(numReservedReduceSlotsForThisJob)));
         qsi.mapTSI.numRunningTasks += numMapsRunningForThisJob;
         qsi.reduceTSI.numRunningTasks += numReducesRunningForThisJob;
         qsi.mapTSI.numSlotsOccupied += numMapSlotsForThisJob;
@@ -1077,10 +1162,12 @@ class CapacityTaskScheduler extends Task
    *  
    */
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
-      throws IOException {
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
+  throws IOException {
     
     TaskLookupResult tlr;
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
+    
     /* 
      * If TT has Map and Reduce slot free, we need to figure out whether to
      * give it a Map or Reduce task.
@@ -1090,14 +1177,14 @@ class CapacityTaskScheduler extends Task
     ClusterStatus c = taskTrackerManager.getClusterStatus();
     int mapClusterCapacity = c.getMaxMapTasks();
     int reduceClusterCapacity = c.getMaxReduceTasks();
-    int maxMapTasks = taskTracker.getMaxMapTasks();
-    int currentMapTasks = taskTracker.countMapTasks();
-    int maxReduceTasks = taskTracker.getMaxReduceTasks();
-    int currentReduceTasks = taskTracker.countReduceTasks();
-    LOG.debug("TT asking for task, max maps=" + taskTracker.getMaxMapTasks() + 
-        ", run maps=" + taskTracker.countMapTasks() + ", max reds=" + 
-        taskTracker.getMaxReduceTasks() + ", run reds=" + 
-        taskTracker.countReduceTasks() + ", map cap=" + 
+    int maxMapSlots = taskTrackerStatus.getMaxMapSlots();
+    int currentMapSlots = taskTrackerStatus.countOccupiedMapSlots();
+    int maxReduceSlots = taskTrackerStatus.getMaxReduceSlots();
+    int currentReduceSlots = taskTrackerStatus.countOccupiedReduceSlots();
+    LOG.debug("TT asking for task, max maps=" + taskTrackerStatus.getMaxMapSlots() + 
+        ", run maps=" + taskTrackerStatus.countMapTasks() + ", max reds=" + 
+        taskTrackerStatus.getMaxReduceSlots() + ", run reds=" + 
+        taskTrackerStatus.countReduceTasks() + ", map cap=" + 
         mapClusterCapacity + ", red cap = " + 
         reduceClusterCapacity);
 
@@ -1111,8 +1198,8 @@ class CapacityTaskScheduler extends Task
     // make sure we get our map or reduce scheduling object to update its 
     // collection of QSI objects too. 
 
-    if ((maxReduceTasks - currentReduceTasks) > 
-    (maxMapTasks - currentMapTasks)) {
+    if ((maxReduceSlots - currentReduceSlots) > 
+    (maxMapSlots - currentMapSlots)) {
       // get a reduce task first
       reduceScheduler.updateCollectionOfQSIs();
       tlr = reduceScheduler.assignTasks(taskTracker);
@@ -1126,7 +1213,7 @@ class CapacityTaskScheduler extends Task
                                   == tlr.getLookUpStatus() ||
                 TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                   == tlr.getLookUpStatus())
-          && (maxMapTasks > currentMapTasks)) {
+          && (maxMapSlots > currentMapSlots)) {
         mapScheduler.updateCollectionOfQSIs();
         tlr = mapScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -1149,7 +1236,7 @@ class CapacityTaskScheduler extends Task
                                     == tlr.getLookUpStatus()
                 || TaskLookupResult.LookUpStatus.NO_TASK_FOUND
                                     == tlr.getLookUpStatus())
-          && (maxReduceTasks > currentReduceTasks)) {
+          && (maxReduceSlots > currentReduceSlots)) {
         reduceScheduler.updateCollectionOfQSIs();
         tlr = reduceScheduler.assignTasks(taskTracker);
         if (TaskLookupResult.LookUpStatus.TASK_FOUND == 
@@ -1181,10 +1268,33 @@ class CapacityTaskScheduler extends Task
       i++;
     }
     qsi.numJobsByUser.put(job.getProfile().getUser(), i);
+    
+    // setup scheduler specific job information
+    preInitializeJob(job);
+    
     LOG.debug("Job " + job.getJobID().toString() + " is added under user " 
               + job.getProfile().getUser() + ", user now has " + i + " jobs");
   }
 
+  /**
+   * Setup {@link CapacityTaskScheduler} specific information prior to
+   * job initialization.
+   */
+  void preInitializeJob(JobInProgress job) {
+    JobConf jobConf = job.getJobConf();
+    
+    // Compute number of slots required to run a single map/reduce task
+    int slotsPerMap = 1;
+    int slotsPerReduce = 1;
+    if (memoryMatcher.isSchedulingBasedOnMemEnabled()) {
+      slotsPerMap = jobConf.computeNumSlotsPerMap(getMemSizeForMapSlot());
+     slotsPerReduce = 
+       jobConf.computeNumSlotsPerReduce(getMemSizeForReduceSlot());
+    }
+    job.setNumSlotsPerMap(slotsPerMap);
+    job.setNumSlotsPerReduce(slotsPerReduce);
+  }
+  
   // called when a job completes
   synchronized void jobCompleted(JobInProgress job) {
     QueueSchedulingInfo qsi = 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/JobInitializationPoller.java Fri Mar  4 03:25:10 2011
@@ -263,7 +263,8 @@ public class JobInitializationPoller ext
    * poller
    */
 
-  void init(Set<String> queues, CapacitySchedulerConf capacityConf) {
+  void init(Set<String> queues, 
+            CapacitySchedulerConf capacityConf) {
     for (String queue : queues) {
       int userlimit = capacityConf.getMinimumUserLimitPercent(queue);
       int maxUsersToInitialize = ((100 / userlimit) + MAX_ADDITIONAL_USERS_TO_INIT);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/MemoryMatcher.java Fri Mar  4 03:25:10 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapreduce.TaskType;
 
 class MemoryMatcher {
 
@@ -54,55 +55,26 @@ class MemoryMatcher {
    *          null if memory cannot be computed for some reason.
    */
   synchronized Long getMemReservedForTasks(
-      TaskTrackerStatus taskTracker, CapacityTaskScheduler.TYPE taskType) {
+      TaskTrackerStatus taskTracker, TaskType taskType) {
     long vmem = 0;
 
     for (TaskStatus task : taskTracker.getTaskReports()) {
       // the following task states are one in which the slot is
       // still occupied and hence memory of the task should be
       // accounted in used memory.
-      if ((task.getRunState() == TaskStatus.State.RUNNING)
-          || (task.getRunState() == TaskStatus.State.COMMIT_PENDING)) {
-        JobInProgress job =
-            scheduler.taskTrackerManager.getJob(task.getTaskID().getJobID());
-        if (job == null) {
-          // This scenario can happen if a job was completed/killed
-          // and retired from JT's memory. In this state, we can ignore
-          // the running task status and compute memory for the rest of
-          // the tasks. However, any scheduling done with this computation
-          // could result in over-subscribing of memory for tasks on this
-          // TT (as the unaccounted for task is still running).
-          // So, it is safer to not schedule anything for this TT
-          // One of the ways of doing that is to return null from here
-          // and check for null in the calling method.
-          LOG.info("Task tracker: " + taskTracker.getHost() + " is reporting "
-              + "a running / commit pending task: " + task.getTaskID()
-              + " but no corresponding job was found. "
-              + "Maybe job was retired. Not computing "
-              + "memory values for this TT.");
-          return null;
-        }
-
-        JobConf jConf = job.getJobConf();
-
-        // Get the memory "allotted" for this task by rounding off the job's
-        // tasks' memory limits to the nearest multiple of the slot-memory-size
-        // set on JT. This essentially translates to tasks of a high memory job
-        // using multiple slots.
+      if ((task.getRunState() == TaskStatus.State.RUNNING) ||
+          (task.getRunState() == TaskStatus.State.UNASSIGNED) ||
+          (task.inTaskCleanupPhase())) {
+        // Get the memory "allotted" for this task based on number of slots
         long myVmem = 0;
-        if (task.getIsMap() && taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
-          myVmem = jConf.getMemoryForMapTask();
-          myVmem =
-              (long) (scheduler.getMemSizeForMapSlot() * Math
-                  .ceil((float) myVmem
-                      / (float) scheduler.getMemSizeForMapSlot()));
+        if (task.getIsMap() && taskType == TaskType.MAP) {
+          long memSizePerMapSlot = scheduler.getMemSizeForMapSlot(); 
+          myVmem = 
+            memSizePerMapSlot * task.getNumSlots();
         } else if (!task.getIsMap()
-            && taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
-          myVmem = jConf.getMemoryForReduceTask();
-          myVmem =
-              (long) (scheduler.getMemSizeForReduceSlot() * Math
-                  .ceil((float) myVmem
-                      / (float) scheduler.getMemSizeForReduceSlot()));
+            && taskType == TaskType.REDUCE) {
+          long memSizePerReduceSlot = scheduler.getMemSizeForReduceSlot(); 
+          myVmem = memSizePerReduceSlot * task.getNumSlots();
         }
         vmem += myVmem;
       }
@@ -118,8 +90,8 @@ class MemoryMatcher {
    * @param taskTracker
    * @return true if this TT has enough memory for this job. False otherwise.
    */
-  boolean matchesMemoryRequirements(JobInProgress job,
-      CapacityTaskScheduler.TYPE taskType, TaskTrackerStatus taskTracker) {
+  boolean matchesMemoryRequirements(JobInProgress job,TaskType taskType, 
+                                    TaskTrackerStatus taskTracker) {
 
     LOG.debug("Matching memory requirements of " + job.getJobID().toString()
         + " for scheduling on " + taskTracker.trackerName);
@@ -131,44 +103,36 @@ class MemoryMatcher {
     }
 
     Long memUsedOnTT = getMemReservedForTasks(taskTracker, taskType);
-    if (memUsedOnTT == null) {
-      // For some reason, maybe because we could not find the job
-      // corresponding to a running task (as can happen if the job
-      // is retired in between), we could not compute the memory state
-      // on this TT. Treat this as an error, and fail memory
-      // requirements.
-      LOG.info("Could not compute memory for taskTracker: "
-          + taskTracker.getHost() + ". Failing memory requirements.");
-      return false;
-    }
-
     long totalMemUsableOnTT = 0;
-
     long memForThisTask = 0;
-    if (taskType.equals(CapacityTaskScheduler.TYPE.MAP)) {
+    if (taskType == TaskType.MAP) {
       memForThisTask = job.getJobConf().getMemoryForMapTask();
       totalMemUsableOnTT =
-          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapTasks();
-    } else if (taskType.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+          scheduler.getMemSizeForMapSlot() * taskTracker.getMaxMapSlots();
+    } else if (taskType == TaskType.REDUCE) {
       memForThisTask = job.getJobConf().getMemoryForReduceTask();
       totalMemUsableOnTT =
           scheduler.getMemSizeForReduceSlot()
-              * taskTracker.getMaxReduceTasks();
+              * taskTracker.getMaxReduceSlots();
     }
 
     long freeMemOnTT = totalMemUsableOnTT - memUsedOnTT.longValue();
     if (memForThisTask > freeMemOnTT) {
-      LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
-          + freeMemOnTT + "). A " + taskType + " task from "
-          + job.getJobID().toString() + " cannot be scheduled on TT "
-          + taskTracker.trackerName);
+      if (LOG.isDebugEnabled()) {
+        LOG.debug("memForThisTask (" + memForThisTask + ") > freeMemOnTT ("
+                  + freeMemOnTT + "). A " + taskType + " task from "
+                  + job.getJobID().toString() + " cannot be scheduled on TT "
+                  + taskTracker.trackerName);
+      }
       return false;
     }
 
-    LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
-        + freeMemOnTT + ". A " + taskType.toString() + " task from "
-        + job.getJobID().toString() + " matches memory requirements on TT "
-        + taskTracker.trackerName);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("memForThisTask = " + memForThisTask + ". freeMemOnTT = "
+                + freeMemOnTT + ". A " + taskType.toString() + " task from "
+                + job.getJobID().toString() + " matches memory requirements "
+                + "on TT "+ taskTracker.trackerName);
+    }
     return true;
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:25:10 2011
@@ -40,6 +40,9 @@ import org.apache.hadoop.mapred.JobStatu
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
 public class TestCapacityScheduler extends TestCase {
 
   static final Log LOG =
@@ -200,7 +203,8 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(true, areAllMapsRunning);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 
+                              super.numSlotsPerMap, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -242,7 +246,7 @@ public class TestCapacityScheduler exten
         }
       }
       TaskAttemptID attemptId = getTaskAttemptID(false, areAllReducesRunning);
-      Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
+      Task task = new ReduceTask("", attemptId, 0, 10, super.numSlotsPerReduce, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -333,7 +337,7 @@ public class TestCapacityScheduler exten
     
     FakeTaskInProgress(JobID jId, JobConf jobConf, Task t, 
         boolean isMap, FakeJobInProgress job) {
-      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0);
+      super(jId, "", new JobClient.RawSplit(), null, jobConf, job, 0, 1);
       this.isMap = isMap;
       this.fakeJob = job;
       activeTasks = new TreeMap<TaskAttemptID, String>();
@@ -418,8 +422,8 @@ public class TestCapacityScheduler exten
       new ArrayList<JobInProgressListener>();
     FakeQueueManager qm = new FakeQueueManager();
     
-    private Map<String, TaskTrackerStatus> trackers =
-      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskTracker> trackers =
+      new HashMap<String, TaskTracker>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
     private Map<JobID, JobInProgress> jobs =
@@ -435,16 +439,22 @@ public class TestCapacityScheduler exten
       this.maxReduceTasksPerTracker = maxReduceTasksPerTracker;
       for (int i = 1; i < numTaskTrackers + 1; i++) {
         String ttName = "tt" + i;
-        trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", i,
-            new ArrayList<TaskStatus>(), 0, maxMapTasksPerTracker,
-            maxReduceTasksPerTracker));
+        TaskTracker tt = new TaskTracker(ttName);
+        tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", i,
+                                           new ArrayList<TaskStatus>(), 0, 
+                                           maxMapTasksPerTracker,
+                                           maxReduceTasksPerTracker));
+        trackers.put(ttName, tt);
       }
     }
     
     public void addTaskTracker(String ttName) {
-      trackers.put(ttName, new TaskTrackerStatus(ttName, ttName + ".host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      TaskTracker tt = new TaskTracker(ttName);
+      tt.setStatus(new TaskTrackerStatus(ttName, ttName + ".host", 1,
+                                         new ArrayList<TaskStatus>(), 0,
+                                         maxMapTasksPerTracker, 
+                                         maxReduceTasksPerTracker));
+      trackers.put(ttName, tt);
     }
     
     public ClusterStatus getClusterStatus() {
@@ -505,7 +515,11 @@ public class TestCapacityScheduler exten
     }
 
     public Collection<TaskTrackerStatus> taskTrackers() {
-      return trackers.values();
+      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+      for (TaskTracker tt : trackers.values()) {
+        statuses.add(tt.getStatus());
+      }
+      return statuses;
     }
 
 
@@ -524,7 +538,7 @@ public class TestCapacityScheduler exten
       }
     }
     
-    public TaskTrackerStatus getTaskTracker(String trackerID) {
+    public TaskTracker getTaskTracker(String trackerID) {
       return trackers.get(trackerID);
     }
     
@@ -544,10 +558,15 @@ public class TestCapacityScheduler exten
         public boolean getIsMap() {
           return t.isMapTask();
         }
+        
+        @Override
+        public int getNumSlots() {
+          return t.getNumSlotsRequired();
+        }
       };
       taskStatuses.put(t.getTaskID().toString(), status);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getTaskReports().add(status);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
     
     public void finishTask(String taskTrackerName, String tipId, 
@@ -1698,14 +1717,14 @@ public class TestCapacityScheduler exten
     // first, a map from j1 will run
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 100.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     // at this point, the scheduler tries to schedule another map from j1. 
     // there isn't enough space. The second job's reduce should be scheduled.
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         100.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
   }
@@ -1754,17 +1773,19 @@ public class TestCapacityScheduler exten
     // Fill the second tt with this job.
     checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 25.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 1, 0, 0, 0, 0),
         (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
     checkAssignment("tt2", "attempt_test_0001_r_000001_0 on tt2");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         25.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 1, 1, 1),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 1, 0, 1, 1, 0),
         (String) job1.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
 
@@ -1781,18 +1802,20 @@ public class TestCapacityScheduler exten
 
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 0, 0, 0, 0),
         (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
 
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     // Total 3 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 3,
         75.0f);
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 0, 1, 2, 0),
         (String) job2.getSchedulingInfo());
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
 
@@ -1812,16 +1835,24 @@ public class TestCapacityScheduler exten
     assertNull(scheduler.assignTasks(tracker("tt2")));
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 75.0f);
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 3,
-        75.0f);
+    // reserved tasktrackers contribute to occupied slots
+    // for maps, both tasktrackers are reserved.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 7, 175.0f);
+    // for reduces, only one tasktracker is reserved, because
+    // the reduce scheduler is not visited for tt1 (as it has
+    // 0 slots free).
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 5,
+        125.0f);
     checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
     checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
+    LOG.info(job2.getSchedulingInfo());
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 1, 2, 1, 2),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        1, 2, 4, 1, 2, 2),
         (String) job2.getSchedulingInfo());
     assertEquals(String.format(
-        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 0, 0, 0, 0),
+        CapacityTaskScheduler.JOB_SCHEDULING_INFO_FORMAT_STRING, 
+        0, 0, 0, 0, 0, 0),
         (String) job3.getSchedulingInfo());
   }
 
@@ -1872,62 +1903,65 @@ public class TestCapacityScheduler exten
     // 1st cycle - 1 map gets assigned.
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     // Total 1 map slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
     checkMemReservedForTasksOnTT("tt1",  512L, 0L);
 
     // 1st cycle of reduces - 1 reduce gets assigned.
     Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     // Total 1 reduce slot should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
         50.0f);
     checkMemReservedForTasksOnTT("tt1",  512L, 512L);
     
     // kill this job !
     taskTrackerManager.killJob(job1.getJobID());
     // No more map/reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 0, 0, 0.0f);
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 0, 0,
+    checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
+    checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
         0.0f);
     
     // retire the job
     taskTrackerManager.removeJob(job1.getJobID());
     
     // submit another job.
-    LOG.debug("Submitting another normal job with 1 map and 1 reduce");
+    LOG.debug("Submitting another normal job with 2 maps and 2 reduces");
     jConf = new JobConf();
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
     jConf.setMemoryForMapTask(512);
     jConf.setMemoryForReduceTask(512);
     jConf.setQueueName("default");
     jConf.setUser("u1");
     FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
     
-    // 2nd cycle - nothing should get assigned. Memory matching code
-    // will see the job is missing and fail memory requirements.
+    // since with HADOOP-5964, we don't rely on a job conf to get
+    // the memory occupied, scheduling should be able to work correctly.
+    t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+
+    // assign a reduce now.
+    t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+    
+    // now, no more can be assigned because all the slots are blocked.
     assertNull(scheduler.assignTasks(tracker("tt1")));
-    checkMemReservedForTasksOnTT("tt1", null, null);
 
-    // calling again should not make a difference, as the task is still running
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    checkMemReservedForTasksOnTT("tt1", null, null);
-    
     // finish the tasks on the tracker.
     taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
     taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
-
+    
     // now a new task can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Total 1 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 1, 50.0f);
-    checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
+    t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+    checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+    // memory used will change because of the finished task above.
+    checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+    
     // reduce can be assigned.
-    t = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
-    // Total 1 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 1,
-        50.0f);
-    checkMemReservedForTasksOnTT("tt1", 512L, 512L);
+    t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+    checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+    checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
   }
 
   /*
@@ -2322,25 +2356,13 @@ public class TestCapacityScheduler exten
   }
 
   /**
-   * Test case to test scheduling of
-   * <ol> 
-   * <li>High ram job with speculative map execution.
-   * <ul>
-   * <li>Submit one high ram job which has speculative map.</li>
-   * <li>Submit a normal job which has no speculative map.</li>
-   * <li>Scheduler should schedule first all map tasks from first job and block
-   * the cluster till both maps from first job get completed.
-   * </ul>
-   * </li>
-   * <li>High ram job with speculative reduce execution.
-   * <ul>
-   * <li>Submit one high ram job which has speculative reduce.</li>
-   * <li>Submit a normal job which has no speculative reduce.</li>
-   * <li>Scheduler should schedule first all reduce tasks from first job and
-   * block the cluster till both reduces are completed.</li>
-   * </ul>
-   * </li>
-   * </ol>
+   * Test case to test scheduling of jobs with speculative execution
+   * in the face of high RAM jobs.
+   * 
+   * Essentially, the test verifies that if a high RAM job has speculative
+   * tasks that cannot run because of memory requirements, we block
+   * that node and do not return any tasks to it.
+   * 
    * @throws IOException
    */
   public void testHighRamJobWithSpeculativeExecution() throws IOException {
@@ -2367,8 +2389,18 @@ public class TestCapacityScheduler exten
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
 
+    // Submit a normal job that should occupy a node
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(0);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("default");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJob(JobStatus.PREP, jConf);
+    
     //Submit a high memory job with speculative tasks.
-    JobConf jConf = new JobConf();
+    jConf = new JobConf();
     jConf.setMemoryForMapTask(2 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
@@ -2377,13 +2409,13 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(true);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job1 =
+    FakeJobInProgress job2 =
         new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
             taskTrackerManager, "u1");
-    taskTrackerManager.submitJob(job1);
+    taskTrackerManager.submitJob(job2);
 
     //Submit normal job
-    jConf = new JobConf();
+    jConf = new JobConf(conf);
     jConf.setMemoryForMapTask(1 * 1024);
     jConf.setMemoryForReduceTask(0);
     jConf.setNumMapTasks(1);
@@ -2392,127 +2424,46 @@ public class TestCapacityScheduler exten
     jConf.setUser("u1");
     jConf.setMapSpeculativeExecution(false);
     jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job2 = submitJob(JobStatus.PREP, jConf);
+    FakeJobInProgress job3 = submitJob(JobStatus.PREP, jConf);
 
     controlledInitializationPoller.selectJobsToInitialize();
     raiseStatusChangeEvents(scheduler.jobQueuesManager);
 
-    // first, a map from j1 will run
-    // at this point, there is a speculative task for the same job to be
-    //scheduled. This task would be scheduled. Till the tasks from job1 gets
-    //complete none of the tasks from other jobs would be scheduled.
+    // Have one node on which all tasks of job1 are scheduled.
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-    assertEquals("pending maps greater than zero " , job1.pendingMaps(), 0);
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
 
-    //make same tracker get back, check if you are blocking. Your job
-    //has speculative map task so tracker should be blocked even tho' it
-    //can run job2's map.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+    // raise events to initialize the 3rd job
+    controlledInitializationPoller.selectJobsToInitialize();
+    raiseStatusChangeEvents(scheduler.jobQueuesManager);
 
-    //TT2 now gets speculative map of the job1
-    checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
-    // Total 4 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 4, 66.7f);
+    // On the second node, one task of the high RAM job can be scheduled.
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
     checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 0L);
-
-    // Now since the first job has no more speculative maps, it can schedule
-    // the second job.
-    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-    // Total 5 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 5, 83.3f);
-    checkMemReservedForTasksOnTT("tt1", 3 * 1024L, 0L);
-
-    //finish everything
+    assertEquals("pending maps greater than zero " , job2.pendingMaps(), 0);
+    // Total 4 map slots should be accounted for.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+    
+    // now when the first node gets back, it cannot run any task
+    // because job2 has a speculative task that can run on this node.
+    // This is even though job3's tasks can run on this node.
+    assertNull(scheduler.assignTasks(tracker("tt1")));
+    // Reservation will count for 2 more slots.
+    checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+    
+    // finish one task from tt1.
     taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", 
         job1);
-    taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", 
-        job1);
-    taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", 
-        job2);
-    taskTrackerManager.finalizeJob(job1);
-    taskTrackerManager.finalizeJob(job2);
     
-    //Now submit high ram job with speculative reduce and check.
-    jConf = new JobConf();
-    jConf.setMemoryForMapTask(2 * 1024);
-    jConf.setMemoryForReduceTask(2 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(true);
-    FakeJobInProgress job3 =
-        new FakeJobInProgress(new JobID("test", ++jobCounter), jConf,
-            taskTrackerManager, "u1");
-    taskTrackerManager.submitJob(job3);
-
-    //Submit normal job w.r.t reduces
-    jConf = new JobConf();
-    jConf.setMemoryForMapTask(1 * 1024L);
-    jConf.setMemoryForReduceTask(1 * 1024L);
-    jConf.setNumMapTasks(1);
-    jConf.setNumReduceTasks(1);
-    jConf.setQueueName("default");
-    jConf.setUser("u1");
-    jConf.setMapSpeculativeExecution(false);
-    jConf.setReduceSpeculativeExecution(false);
-    FakeJobInProgress job4 = submitJob(JobStatus.PREP, jConf);
+    // now, we can schedule the speculative task on tt1
+    checkAssignment("tt1", "attempt_test_0002_m_000001_1 on tt1");
     
-    controlledInitializationPoller.selectJobsToInitialize();
-    raiseStatusChangeEvents(scheduler.jobQueuesManager);
-
-    // Finish up the map scheduler
+    // finish one more task from tt1.
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", 
+        job1);
+    
+    // now the new job's tasks can be scheduled.
     checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
-    // Total 2 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 2, 33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
-    checkAssignment("tt2", "attempt_test_0004_m_000001_0 on tt2");
-    // Total 3 map slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.MAP, 1, 3, 50.0f);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
-
-    // first, a reduce from j3 will run
-    // at this point, there is a speculative task for the same job to be
-    //scheduled. This task would be scheduled. Till the tasks from job3 gets
-    //complete none of the tasks from other jobs would be scheduled.
-    checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
-    assertEquals("pending reduces greater than zero ", job3.pendingReduces(),
-        0);
-    // Total 2 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
-        33.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2*1024L);
-
-    //make same tracker get back, check if you are blocking. Your job
-    //has speculative reduce task so tracker should be blocked even tho' it
-    //can run job4's reduce.
-    assertNull(scheduler.assignTasks(tracker("tt1")));
-    // Total 2 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 2,
-        33.3f);
-
-    //TT2 now gets speculative reduce of the job3
-    checkAssignment("tt2", "attempt_test_0003_r_000001_1 on tt2");
-    // Total 4 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 4,
-        66.7f);
-    checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 2 * 1024L);
-
-    // Now since j3 has no more speculative reduces, it can schedule
-    // the j4.
-    checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
-    // Total 5 reduce slots should be accounted for.
-    checkOccupiedSlots("default", CapacityTaskScheduler.TYPE.REDUCE, 1, 5,
-        83.3f);
-    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 3 * 1024L);
   }
 
   /**
@@ -2570,32 +2521,32 @@ public class TestCapacityScheduler exten
     // Map 1 of high memory job
     checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 1 of high memory job
     checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 1 of normal job
     checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 1 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 2 of normal job
     checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 2 of normal job
     checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Now both the queues are equally served. But the comparator doesn't change
     // the order if queues are equally served.
@@ -2603,32 +2554,32 @@ public class TestCapacityScheduler exten
     // Map 3 of normal job
     checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 3 of normal job
     checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 2 of high memory job
     checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 2 of high memory job
     checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
     checkQueuesOrder(qs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
 
     // Map 4 of normal job
     checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.MAP));
+        .getOrderedQueues(TaskType.MAP));
 
     // Reduce 4 of normal job
     checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
     checkQueuesOrder(reversedQs, scheduler
-        .getOrderedQueues(CapacityTaskScheduler.TYPE.REDUCE));
+        .getOrderedQueues(TaskType.REDUCE));
   }
 
   private void checkFailedInitializedJobMovement() throws IOException {
@@ -2713,7 +2664,7 @@ public class TestCapacityScheduler exten
   }
 
   
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
+  protected TaskTracker tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
   
@@ -2737,11 +2688,11 @@ public class TestCapacityScheduler exten
   private void checkMemReservedForTasksOnTT(String taskTracker,
       Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
     Long observedMemForMapsOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.MAP);
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+            TaskType.MAP);
     Long observedMemForReducesOnTT =
-        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker),
-            CapacityTaskScheduler.TYPE.REDUCE);
+        scheduler.memoryMatcher.getMemReservedForTasks(tracker(taskTracker).getStatus(),
+            TaskType.REDUCE);
     if (expectedMemForMapsOnTT == null) {
       assertTrue(observedMemForMapsOnTT == null);
     } else {
@@ -2765,7 +2716,7 @@ public class TestCapacityScheduler exten
    * @return
    */
   private void checkOccupiedSlots(String queue,
-      CapacityTaskScheduler.TYPE type, int numActiveUsers,
+      TaskType type, int numActiveUsers,
       int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
     scheduler.updateQSIInfoForTests();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
@@ -2773,9 +2724,9 @@ public class TestCapacityScheduler exten
         queueManager.getJobQueueInfo(queue).getSchedulingInfo();
     String[] infoStrings = schedulingInfo.split("\n");
     int index = -1;
-    if (type.equals(CapacityTaskScheduler.TYPE.MAP)) {
+    if (type.equals(TaskType.MAP)) {
       index = 7;
-    } else if (type.equals(CapacityTaskScheduler.TYPE.REDUCE)) {
+    } else if (type.equals(TaskType.REDUCE)) {
       index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
     }
     LOG.info(infoStrings[index]);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Fri Mar  4 03:25:10 2011
@@ -40,13 +40,13 @@ public class CapBasedLoadManager extends
   public boolean canAssignMap(TaskTrackerStatus tracker,
       int totalRunnableMaps, int totalMapSlots) {
     return tracker.countMapTasks() < getCap(totalRunnableMaps,
-        tracker.getMaxMapTasks(), totalMapSlots);
+        tracker.getMaxMapSlots(), totalMapSlots);
   }
 
   @Override
   public boolean canAssignReduce(TaskTrackerStatus tracker,
       int totalRunnableReduces, int totalReduceSlots) {
     return tracker.countReduceTasks() < getCap(totalRunnableReduces,
-        tracker.getMaxReduceTasks(), totalReduceSlots);
+        tracker.getMaxReduceSlots(), totalReduceSlots);
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Mar  4 03:25:10 2011
@@ -37,6 +37,7 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that implements fair sharing.
@@ -218,7 +219,7 @@ public class FairScheduler extends TaskS
   }
   
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+  public synchronized List<Task> assignTasks(TaskTracker tracker)
       throws IOException {
     if (!initialized) // Don't try to assign tasks if we haven't yet started up
       return null;
@@ -244,10 +245,11 @@ public class FairScheduler extends TaskS
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
     TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+    TaskTrackerStatus trackerStatus = tracker.getStatus();
     for (TaskType taskType: types) {
       boolean canAssign = (taskType == TaskType.MAP) ? 
-          loadMgr.canAssignMap(tracker, runnableMaps, totalMapSlots) :
-          loadMgr.canAssignReduce(tracker, runnableReduces, totalReduceSlots);
+          loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
+          loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
       if (canAssign) {
         // Figure out the jobs that need this type of task
         List<JobInProgress> candidates = new ArrayList<JobInProgress>();
@@ -263,8 +265,8 @@ public class FairScheduler extends TaskS
         Collections.sort(candidates, comparator);
         for (JobInProgress job: candidates) {
           Task task = (taskType == TaskType.MAP ? 
-              taskSelector.obtainNewMapTask(tracker, job) :
-              taskSelector.obtainNewReduceTask(tracker, job));
+              taskSelector.obtainNewMapTask(trackerStatus, job) :
+              taskSelector.obtainNewReduceTask(trackerStatus, job));
           if (task != null) {
             // Update the JobInfo for this job so we account for the launched
             // tasks during this update interval and don't try to launch more

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar  4 03:25:10 2011
@@ -33,6 +33,7 @@ import junit.framework.TestCase;
 import org.apache.hadoop.io.BytesWritable;
 import org.apache.hadoop.mapred.JobStatus;
 import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 public class TestFairScheduler extends TestCase {
   final static String TEST_DIR = new File(System.getProperty("test.build.data",
@@ -67,7 +68,7 @@ public class TestFairScheduler extends T
     public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
         int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(true);
-      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), getJobConf().getUser()) {
+      Task task = new MapTask("", attemptId, 0, "", new BytesWritable(), 1, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -82,7 +83,7 @@ public class TestFairScheduler extends T
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
       TaskAttemptID attemptId = getTaskAttemptID(false);
-      Task task = new ReduceTask("", attemptId, 0, 10, getJobConf().getUser()) {
+      Task task = new ReduceTask("", attemptId, 0, 10, 1, getJobConf().getUser()) {
         @Override
         public String toString() {
           return String.format("%s on %s", getTaskID(), tts.getTrackerName());
@@ -108,18 +109,26 @@ public class TestFairScheduler extends T
     List<JobInProgressListener> listeners =
       new ArrayList<JobInProgressListener>();
     
-    private Map<String, TaskTrackerStatus> trackers =
-      new HashMap<String, TaskTrackerStatus>();
+    private Map<String, TaskTracker> trackers =
+      new HashMap<String, TaskTracker>();
     private Map<String, TaskStatus> taskStatuses = 
       new HashMap<String, TaskStatus>();
 
     public FakeTaskTrackerManager() {
-      trackers.put("tt1", new TaskTrackerStatus("tt1", "tt1.host", 1,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
-      trackers.put("tt2", new TaskTrackerStatus("tt2", "tt2.host", 2,
-          new ArrayList<TaskStatus>(), 0,
-          maxMapTasksPerTracker, maxReduceTasksPerTracker));
+      TaskTracker tt1 = new TaskTracker("tt1");
+      tt1.setStatus(new TaskTrackerStatus("tt1", "tt1.host", 1,
+                                          new ArrayList<TaskStatus>(), 0,
+                                          maxMapTasksPerTracker, 
+                                          maxReduceTasksPerTracker));
+      trackers.put("tt1", tt1);
+      
+      TaskTracker tt2 = new TaskTracker("tt2");
+      tt2.setStatus(new TaskTrackerStatus("tt2", "tt2.host", 2,
+                                          new ArrayList<TaskStatus>(), 0,
+                                          maxMapTasksPerTracker, 
+                                          maxReduceTasksPerTracker));
+      trackers.put("tt2", tt2);
+
     }
     
     @Override
@@ -143,7 +152,11 @@ public class TestFairScheduler extends T
 
     @Override
     public Collection<TaskTrackerStatus> taskTrackers() {
-      return trackers.values();
+      List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+      for (TaskTracker tt : trackers.values()) {
+        statuses.add(tt.getStatus());
+      }
+      return statuses;
     }
 
 
@@ -188,7 +201,7 @@ public class TestFairScheduler extends T
       }
     }
     
-    public TaskTrackerStatus getTaskTracker(String trackerID) {
+    public TaskTracker getTaskTracker(String trackerID) {
       return trackers.get(trackerID);
     }
     
@@ -206,7 +219,7 @@ public class TestFairScheduler extends T
       };
       taskStatuses.put(t.getTaskID().toString(), status);
       status.setRunState(TaskStatus.State.RUNNING);
-      trackers.get(taskTrackerName).getTaskReports().add(status);
+      trackers.get(taskTrackerName).getStatus().getTaskReports().add(status);
     }
     
     public void finishTask(String taskTrackerName, String tipId) {
@@ -1227,7 +1240,7 @@ public class TestFairScheduler extends T
     scheduler.update();
   }
 
-  protected TaskTrackerStatus tracker(String taskTrackerName) {
+  protected TaskTracker tracker(String taskTrackerName) {
     return taskTrackerManager.getTaskTracker(taskTrackerName);
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar  4 03:25:10 2011
@@ -61,8 +61,9 @@ interface InterTrackerProtocol extends V
    *            (HADOOP-4869) 
    * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
    * Version 25: JobIDs are passed in response to JobTracker restart 
+   * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    */
-  public static final long versionID = 25L;
+  public static final long versionID = 26L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Fri Mar  4 03:25:10 2011
@@ -206,11 +206,13 @@ public class IsolationRunner {
       BytesWritable split = new BytesWritable();
       split.readFields(splitFile);
       splitFile.close();
-      task = new MapTask(jobFilename.toString(), taskId, partition, splitClass, split, conf.getUser());
+      task = new MapTask(jobFilename.toString(), taskId, partition, 
+                         splitClass, split, 1, conf.getUser());
     } else {
       int numMaps = conf.getNumMapTasks();
       fillInMissingMapOutputs(local, taskId, numMaps, conf);
-      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, conf.getUser());
+      task = new ReduceTask(jobFilename.toString(), taskId, partition, numMaps, 
+                            1, conf.getUser());
     }
     task.setConf(conf);
     task.run(conf, new FakeUmbilical());

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobConf.java Fri Mar  4 03:25:10 2011
@@ -1491,6 +1491,39 @@ public class JobConf extends Configurati
     return val;
   }
 
+  /**
+   * Compute the number of slots required to run a single map task-attempt
+   * of this job.
+   * @param slotSizePerMap cluster-wide value of the amount of memory required
+   *                       to run a map-task
+   * @return the number of slots required to run a single map task-attempt
+   *                1 if memory parameters are disabled.
+   */
+  int computeNumSlotsPerMap(long slotSizePerMap) {
+    if ((slotSizePerMap==DISABLED_MEMORY_LIMIT) ||
+        (getMemoryForMapTask()==DISABLED_MEMORY_LIMIT)) {
+      return 1;
+    }
+    return (int)(Math.ceil((float)getMemoryForMapTask() / (float)slotSizePerMap));
+  }
+  
+  /**
+   * Compute the number of slots required to run a single reduce task-attempt
+   * of this job.
+   * @param slotSizePerReduce cluster-wide value of the amount of memory 
+   *                          required to run a reduce-task
+   * @return the number of slots required to run a single reduce task-attempt
+   *                1 if memory parameters are disabled.
+   */
+  int computeNumSlotsPerReduce(long slotSizePerReduce) {
+    if ((slotSizePerReduce==DISABLED_MEMORY_LIMIT) ||
+        (getMemoryForReduceTask()==DISABLED_MEMORY_LIMIT)) {
+      return 1;
+    }
+    return 
+    (int)(Math.ceil((float)getMemoryForReduceTask() / (float)slotSizePerReduce));
+  }
+  
   /** 
    * Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing