You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:11 UTC

svn commit: r1076949 [2/3] - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ contrib/fairscheduler/src/java/org/apa...

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 03:25:10 2011
@@ -21,6 +21,7 @@ import java.io.DataInputStream;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
 import java.util.IdentityHashMap;
 import java.util.Iterator;
 import java.util.LinkedHashSet;
@@ -45,6 +46,8 @@ import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
 import org.apache.hadoop.net.Node;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /*************************************************************
  * JobInProgress maintains all the info for keeping
@@ -52,6 +55,8 @@ import org.apache.hadoop.util.StringUtil
  * and its latest JobStatus, plus a set of tables for 
  * doing bookkeeping of its Tasks.
  * ***********************************************************
+ * 
+ * This is NOT a public interface!
  */
 class JobInProgress {
   /**
@@ -77,6 +82,8 @@ class JobInProgress {
   TaskInProgress setup[] = new TaskInProgress[0];
   int numMapTasks = 0;
   int numReduceTasks = 0;
+  int numSlotsPerMap = 1;
+  int numSlotsPerReduce = 1;
   
   // Counters to track currently running/finished/failed Map/Reduce task-attempts
   int runningMapTasks = 0;
@@ -194,7 +201,9 @@ class JobInProgress {
     TOTAL_LAUNCHED_REDUCES,
     OTHER_LOCAL_MAPS,
     DATA_LOCAL_MAPS,
-    RACK_LOCAL_MAPS
+    RACK_LOCAL_MAPS,
+    FALLOW_SLOTS_MILLIS_MAPS,
+    FALLOW_SLOTS_MILLIS_REDUCES
   }
   private Counters jobCounters = new Counters();
   
@@ -210,6 +219,36 @@ class JobInProgress {
 
   private Object schedulingInfo;
 
+  private static class FallowSlotInfo {
+    long timestamp;
+    int numSlots;
+
+    public FallowSlotInfo(long timestamp, int numSlots) {
+      this.timestamp = timestamp;
+      this.numSlots = numSlots;
+    }
+
+    public long getTimestamp() {
+      return timestamp;
+    }
+
+    public void setTimestamp(long timestamp) {
+      this.timestamp = timestamp;
+    }
+
+    public int getNumSlots() {
+      return numSlots;
+    }
+
+    public void setNumSlots(int numSlots) {
+      this.numSlots = numSlots;
+    }
+  }
+
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForMaps = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
+  private Map<TaskTracker, FallowSlotInfo> trackersReservedForReduces = 
+    new HashMap<TaskTracker, FallowSlotInfo>();
   
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
@@ -391,6 +430,41 @@ class JobInProgress {
   }
 
   /**
+   * Get the number of slots required to run a single map task-attempt.
+   * @return the number of slots required to run a single map task-attempt
+   */
+  synchronized int getNumSlotsPerMap() {
+    return numSlotsPerMap;
+  }
+
+  /**
+   * Set the number of slots required to run a single map task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single map task-attempt
+   */
+  synchronized void setNumSlotsPerMap(int numSlotsPerMap) {
+    this.numSlotsPerMap = numSlotsPerMap;
+  }
+
+  /**
+   * Get the number of slots required to run a single reduce task-attempt.
+   * @return the number of slots required to run a single reduce task-attempt
+   */
+  synchronized int getNumSlotsPerReduce() {
+    return numSlotsPerReduce;
+  }
+
+  /**
+   * Set the number of slots required to run a single reduce task-attempt.
+   * This is typically set by schedulers which support high-ram jobs.
+   * @param slots the number of slots required to run a single reduce 
+   *              task-attempt
+   */
+  synchronized void setNumSlotsPerReduce(int numSlotsPerReduce) {
+    this.numSlotsPerReduce = numSlotsPerReduce;
+  }
+
+  /**
    * Construct the splits, etc.  This is invoked from an async
    * thread so that split-computation doesn't block anyone.
    */
@@ -449,7 +523,7 @@ class JobInProgress {
       inputLength += splits[i].getDataLength();
       maps[i] = new TaskInProgress(jobId, jobFile, 
                                    splits[i], 
-                                   jobtracker, conf, this, i);
+                                   jobtracker, conf, this, i, numSlotsPerMap);
     }
     LOG.info("Input size for job " + jobId + " = " + inputLength
         + ". Number of splits = " + splits.length);
@@ -467,7 +541,7 @@ class JobInProgress {
     for (int i = 0; i < numReduceTasks; i++) {
       reduces[i] = new TaskInProgress(jobId, jobFile, 
                                       numMapTasks, i, 
-                                      jobtracker, conf, this);
+                                      jobtracker, conf, this, numSlotsPerReduce);
       nonRunningReduces.add(reduces[i]);
     }
 
@@ -486,12 +560,12 @@ class JobInProgress {
     // split.
     JobClient.RawSplit emptySplit = new JobClient.RawSplit();
     cleanup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks);
+            jobtracker, conf, this, numMapTasks, 1);
     cleanup[0].setJobCleanupTask();
 
     // cleanup reduce tip.
     cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks, jobtracker, conf, this);
+                       numReduceTasks, jobtracker, conf, this, 1);
     cleanup[1].setJobCleanupTask();
 
     // create two setup tips, one map and one reduce.
@@ -500,12 +574,12 @@ class JobInProgress {
     // setup map tip. This map doesn't use any split. Just assign an empty
     // split.
     setup[0] = new TaskInProgress(jobId, jobFile, emptySplit, 
-            jobtracker, conf, this, numMapTasks + 1 );
+            jobtracker, conf, this, numMapTasks + 1, 1);
     setup[0].setJobSetupTask();
 
     // setup reduce tip.
     setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
-                       numReduceTasks + 1, jobtracker, conf, this);
+                       numReduceTasks + 1, jobtracker, conf, this, 1);
     setup[1].setJobSetupTask();
     
     synchronized(jobInitKillStatus){
@@ -564,6 +638,15 @@ class JobInProgress {
     return numReduceTasks - runningReduceTasks - failedReduceTIPs - 
     finishedReduceTasks + speculativeReduceTasks;
   }
+  public synchronized int getNumSlotsPerTask(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return numSlotsPerMap;
+    } else if (taskType == TaskType.REDUCE) {
+      return numSlotsPerReduce;
+    } else {
+      return 1;
+    }
+  }
   public JobPriority getPriority() {
     return this.priority;
   }
@@ -778,8 +861,10 @@ class JobInProgress {
     if (change) {
       TaskStatus.State state = status.getRunState();
       // get the TaskTrackerStatus where the task ran 
-      TaskTrackerStatus ttStatus = 
+      TaskTracker taskTracker = 
         this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
+      TaskTrackerStatus ttStatus = 
+        (taskTracker == null) ? null : taskTracker.getStatus();
       String httpTaskLogLocation = null; 
 
       if (null != ttStatus){
@@ -841,7 +926,7 @@ class JobInProgress {
         }
         
         // Tell the job to fail the relevant task
-        failedTask(tip, taskid, status, ttStatus,
+        failedTask(tip, taskid, status, taskTracker,
                    wasRunning, wasComplete);
 
         // Did the task failure lead to tip failure?
@@ -1374,9 +1459,9 @@ class JobInProgress {
    * to the blacklist iff too many trackers in the cluster i.e. 
    * (clusterSize * CLUSTER_BLACKLIST_PERCENT) haven't turned 'flaky' already.
    * 
-   * @param trackerName task-tracker on which a task failed
+   * @param taskTracker task-tracker on which a task failed
    */
-  void addTrackerTaskFailure(String trackerName) {
+  void addTrackerTaskFailure(String trackerName, TaskTracker taskTracker) {
     if (flakyTaskTrackers < (clusterSize * CLUSTER_BLACKLIST_PERCENT)) { 
       String trackerHostName = convertTrackerNameToHostName(trackerName);
 
@@ -1389,11 +1474,78 @@ class JobInProgress {
       // Check if this tasktracker has turned 'flaky'
       if (trackerFailures.intValue() == conf.getMaxTaskFailuresPerTracker()) {
         ++flakyTaskTrackers;
+        
+        // Cancel reservations if appropriate
+        if (taskTracker != null) {
+          taskTracker.unreserveSlots(TaskType.MAP, this);
+          taskTracker.unreserveSlots(TaskType.REDUCE, this);
+        }
         LOG.info("TaskTracker at '" + trackerHostName + "' turned 'flaky'");
       }
     }
   }
+  
+  public synchronized void reserveTaskTracker(TaskTracker taskTracker,
+                                              TaskType type, int numSlots) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : trackersReservedForReduces;
+    
+    long now = System.currentTimeMillis();
+    
+    FallowSlotInfo info = map.get(taskTracker);
+    if (info == null) {
+      info = new FallowSlotInfo(now, numSlots);
+    } else {
+      // Increment metering info if the reservation is changing
+      if (info.getNumSlots() != numSlots) {
+        Enum<Counter> counter = 
+          (type == TaskType.MAP) ? 
+              Counter.FALLOW_SLOTS_MILLIS_MAPS : 
+              Counter.FALLOW_SLOTS_MILLIS_REDUCES;
+        long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+        jobCounters.incrCounter(counter, fallowSlotMillis);
+        
+        // Update 
+        info.setTimestamp(now);
+        info.setNumSlots(numSlots);
+      }
+    }
+    map.put(taskTracker, info);
+  }
+  
+  public synchronized void unreserveTaskTracker(TaskTracker taskTracker,
+                                                TaskType type) {
+    Map<TaskTracker, FallowSlotInfo> map =
+      (type == TaskType.MAP) ? trackersReservedForMaps : 
+                               trackersReservedForReduces;
+
+    FallowSlotInfo info = map.get(taskTracker);
+    if (info == null) {
+      LOG.warn("Cannot find information about fallow slots for " + 
+               taskTracker.getTrackerName());
+      return;
+    }
     
+    long now = System.currentTimeMillis();
+
+    Enum<Counter> counter = 
+      (type == TaskType.MAP) ? 
+          Counter.FALLOW_SLOTS_MILLIS_MAPS : 
+          Counter.FALLOW_SLOTS_MILLIS_REDUCES;
+    long fallowSlotMillis = (now - info.getTimestamp()) * info.getNumSlots();
+    jobCounters.incrCounter(counter, fallowSlotMillis);
+
+    map.remove(taskTracker);
+  }
+  
+  public int getNumReservedTaskTrackersForMaps() {
+    return trackersReservedForMaps.size();
+  }
+  
+  public int getNumReservedTaskTrackersForReduces() {
+    return trackersReservedForReduces.size();
+  }
+  
   private int getTrackerTaskFailures(String trackerName) {
     String trackerHostName = convertTrackerNameToHostName(trackerName);
     Integer failedTasks = trackerToFailuresMap.get(trackerHostName);
@@ -2008,7 +2160,7 @@ class JobInProgress {
 
     // Update jobhistory 
     TaskTrackerStatus ttStatus = 
-      this.jobtracker.getTaskTracker(status.getTaskTracker());
+      this.jobtracker.getTaskTrackerStatus(status.getTaskTracker());
     String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
     String taskType = getTaskType(tip);
     if (status.getIsMap()){
@@ -2121,6 +2273,7 @@ class JobInProgress {
         this.status.setReduceProgress(1.0f);
       }
       this.finishTime = System.currentTimeMillis();
+      cancelReservedSlots();
       LOG.info("Job " + this.status.getJobID() + 
                " has completed successfully.");
       JobHistory.JobInfo.logFinished(this.status.getJobID(), finishTime, 
@@ -2204,9 +2357,20 @@ class JobInProgress {
       for (int i = 0; i < reduces.length; i++) {
         reduces[i].kill();
       }
+      
+      // Clear out reserved tasktrackers
+      cancelReservedSlots();
     }
   }
 
+  private void cancelReservedSlots() {
+    for (TaskTracker tt : trackersReservedForMaps.keySet()) {
+      tt.unreserveSlots(TaskType.MAP, this);
+    }
+    for (TaskTracker tt : trackersReservedForReduces.keySet()) {
+      tt.unreserveSlots(TaskType.REDUCE, this);
+    }
+  }
   private void clearUncleanTasks() {
     TaskAttemptID taskid = null;
     TaskInProgress tip = null;
@@ -2264,7 +2428,7 @@ class JobInProgress {
    */
   private void failedTask(TaskInProgress tip, TaskAttemptID taskid, 
                           TaskStatus status, 
-                          TaskTrackerStatus taskTrackerStatus,
+                          TaskTracker taskTracker,
                           boolean wasRunning, boolean wasComplete) {
     final JobTrackerInstrumentation metrics = jobtracker.getInstrumentation();
     // check if the TIP is already failed
@@ -2324,6 +2488,8 @@ class JobInProgress {
     String taskTrackerName = taskStatus.getTaskTracker();
     String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
     int taskTrackerPort = -1;
+    TaskTrackerStatus taskTrackerStatus = 
+      (taskTracker == null) ? null : taskTracker.getStatus();
     if (taskTrackerStatus != null) {
       taskTrackerPort = taskTrackerStatus.getHttpPort();
     }
@@ -2369,7 +2535,7 @@ class JobInProgress {
     // Note down that a task has failed on this tasktracker 
     //
     if (status.getRunState() == TaskStatus.State.FAILED) { 
-      addTrackerTaskFailure(taskTrackerName);
+      addTrackerTaskFailure(taskTrackerName, taskTracker);
     }
         
     //
@@ -2460,6 +2626,9 @@ class JobInProgress {
     TaskStatus status = TaskStatus.createTaskStatus(tip.isMapTask(), 
                                                     taskid,
                                                     0.0f,
+                                                    tip.isMapTask() ? 
+                                                        numSlotsPerMap : 
+                                                        numSlotsPerReduce,
                                                     state,
                                                     reason,
                                                     reason,

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress_Counter.properties Fri Mar  4 03:25:10 2011
@@ -1,12 +1,14 @@
 # ResourceBundle properties file for job-level counters
 
-CounterGroupName=              Job Counters 
+CounterGroupName=                  Job Counters 
 
-NUM_FAILED_MAPS.name=          Failed map tasks
-NUM_FAILED_REDUCES.name=       Failed reduce tasks
-TOTAL_LAUNCHED_MAPS.name=      Launched map tasks
-TOTAL_LAUNCHED_REDUCES.name=   Launched reduce tasks
-OTHER_LOCAL_MAPS.name=         Other local map tasks
-DATA_LOCAL_MAPS.name=          Data-local map tasks
-RACK_LOCAL_MAPS.name=          Rack-local map tasks
+NUM_FAILED_MAPS.name=              Failed map tasks
+NUM_FAILED_REDUCES.name=           Failed reduce tasks
+TOTAL_LAUNCHED_MAPS.name=          Launched map tasks
+TOTAL_LAUNCHED_REDUCES.name=       Launched reduce tasks
+OTHER_LOCAL_MAPS.name=             Other local map tasks
+DATA_LOCAL_MAPS.name=              Data-local map tasks
+RACK_LOCAL_MAPS.name=              Rack-local map tasks
+FALLOW_SLOTS_MILLIS_MAPS.name=     Total time spent by all maps waiting after reserving slots (ms)
+FALLOW_SLOTS_MILLIS_REDUCES.name=  Total time spent by all reduces waiting after reserving slots (ms)
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueTaskScheduler.java Fri Mar  4 03:25:10 2011
@@ -25,6 +25,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that keeps jobs in a queue in priority order (FIFO
@@ -77,9 +78,9 @@ class JobQueueTaskScheduler extends Task
   }
 
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
-
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus(); 
     ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     final int numTaskTrackers = clusterStatus.getTaskTrackers();
     final int clusterMapCapacity = clusterStatus.getMaxMapTasks();
@@ -91,10 +92,10 @@ class JobQueueTaskScheduler extends Task
     //
     // Get map + reduce counts for the current tracker.
     //
-    final int trackerMapCapacity = taskTracker.getMaxMapTasks();
-    final int trackerReduceCapacity = taskTracker.getMaxReduceTasks();
-    final int trackerRunningMaps = taskTracker.countMapTasks();
-    final int trackerRunningReduces = taskTracker.countReduceTasks();
+    final int trackerMapCapacity = taskTrackerStatus.getMaxMapSlots();
+    final int trackerReduceCapacity = taskTrackerStatus.getMaxReduceSlots();
+    final int trackerRunningMaps = taskTrackerStatus.countMapTasks();
+    final int trackerRunningReduces = taskTrackerStatus.countReduceTasks();
 
     // Assigned tasks
     List<Task> assignedTasks = new ArrayList<Task>();
@@ -167,7 +168,7 @@ class JobQueueTaskScheduler extends Task
           
           // Try to schedule a node-local or rack-local Map task
           t = 
-            job.obtainNewLocalMapTask(taskTracker, numTaskTrackers,
+            job.obtainNewLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                       taskTrackerManager.getNumberOfUniqueHosts());
           if (t != null) {
             assignedTasks.add(t);
@@ -186,7 +187,7 @@ class JobQueueTaskScheduler extends Task
           
           // Try to schedule a node-local or rack-local Map task
           t = 
-            job.obtainNewNonLocalMapTask(taskTracker, numTaskTrackers,
+            job.obtainNewNonLocalMapTask(taskTrackerStatus, numTaskTrackers,
                                    taskTrackerManager.getNumberOfUniqueHosts());
           
           if (t != null) {
@@ -224,7 +225,7 @@ class JobQueueTaskScheduler extends Task
           }
 
           Task t = 
-            job.obtainNewReduceTask(taskTracker, numTaskTrackers, 
+            job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers, 
                                     taskTrackerManager.getNumberOfUniqueHosts()
                                     );
           if (t != null) {
@@ -243,7 +244,7 @@ class JobQueueTaskScheduler extends Task
     }
     
     if (LOG.isDebugEnabled()) {
-      LOG.debug("Task assignments for " + taskTracker.getTrackerName() + " --> " +
+      LOG.debug("Task assignments for " + taskTrackerStatus.getTrackerName() + " --> " +
                 "[" + mapLoadFactor + ", " + trackerMapCapacity + ", " + 
                 trackerCurrentMapCapacity + ", " + trackerRunningMaps + "] -> [" + 
                 (trackerCurrentMapCapacity - trackerRunningMaps) + ", " +

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:25:10 2011
@@ -93,6 +93,9 @@ import org.apache.hadoop.util.Reflection
 import org.apache.hadoop.util.StringUtils;
 import org.apache.hadoop.util.VersionInfo;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+
 /*******************************************************
  * JobTracker is the central location for submitting and 
  * tracking MR jobs in a network environment.
@@ -278,7 +281,8 @@ public class JobTracker implements MRCon
                     JobInProgress job = tip.getJob();
                     String trackerName = getAssignedTracker(taskId);
                     TaskTrackerStatus trackerStatus = 
-                      getTaskTracker(trackerName);
+                      getTaskTrackerStatus(trackerName); 
+                      
                     // This might happen when the tasktracker has already
                     // expired and this thread tries to call failedtask
                     // again. expire tasktracker should have called failed
@@ -359,22 +363,25 @@ public class JobTracker implements MRCon
                 long now = System.currentTimeMillis();
                 TaskTrackerStatus leastRecent = null;
                 while ((trackerExpiryQueue.size() > 0) &&
-                       ((leastRecent = trackerExpiryQueue.first()) != null) &&
-                       (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
+                       (leastRecent = trackerExpiryQueue.first()) != null &&
+                       ((now - leastRecent.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL)) {
+
                         
                   // Remove profile from head of queue
                   trackerExpiryQueue.remove(leastRecent);
                   String trackerName = leastRecent.getTrackerName();
                         
                   // Figure out if last-seen time should be updated, or if tracker is dead
-                  TaskTrackerStatus newProfile = taskTrackers.get(leastRecent.getTrackerName());
+                  TaskTracker current = getTaskTracker(trackerName);
+                  TaskTrackerStatus newProfile = 
+                    (current == null ) ? null : current.getStatus();
                   // Items might leave the taskTracker set through other means; the
                   // status stored in 'taskTrackers' might be null, which means the
                   // tracker has already been destroyed.
                   if (newProfile != null) {
-                    if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+                    if ((now - newProfile.getLastSeen()) > TASKTRACKER_EXPIRY_INTERVAL) {
                       // Remove completely after marking the tasks as 'KILLED'
-                      lostTaskTracker(leastRecent.getTrackerName());
+                      lostTaskTracker(current);
                       // tracker is lost, and if it is blacklisted, remove 
                       // it from the count of blacklisted trackers in the cluster
                       if (isBlacklisted(trackerName)) {
@@ -384,7 +391,7 @@ public class JobTracker implements MRCon
                       
                       // remove the mapping from the hosts list
                       String hostname = newProfile.getHost();
-                      hostnameToTrackerName.get(hostname).remove(trackerName);
+                      hostnameToTaskTracker.get(hostname).remove(trackerName);
                     } else {
                       // Update time by inserting latest profile
                       trackerExpiryQueue.add(newProfile);
@@ -638,9 +645,9 @@ public class JobTracker implements MRCon
       synchronized (taskTrackers) {
         // remove the capacity of trackers on this host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          int mapSlots = status.getMaxMapTasks();
+          int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity -= mapSlots;
-          int reduceSlots = status.getMaxReduceTasks();
+          int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity -= reduceSlots;
           getInstrumentation().addBlackListedMapSlots(
               mapSlots);
@@ -657,9 +664,9 @@ public class JobTracker implements MRCon
         int numTrackersOnHost = 0;
         // add the capacity of trackers on the host
         for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
-          int mapSlots = status.getMaxMapTasks();
+          int mapSlots = status.getMaxMapSlots();
           totalMapTaskCapacity += mapSlots;
-          int reduceSlots = status.getMaxReduceTasks();
+          int reduceSlots = status.getMaxReduceSlots();
           totalReduceTaskCapacity += reduceSlots;
           numTrackersOnHost++;
           getInstrumentation().decBlackListedMapSlots(mapSlots);
@@ -707,7 +714,8 @@ public class JobTracker implements MRCon
   private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
     List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus(); 
         if (hostName.equals(status.getHost())) {
           statuses.add(status);
         }
@@ -1041,14 +1049,14 @@ public class JobTracker implements MRCon
       // II. Create the (appropriate) task status
       if (type.equals(Values.MAP.name())) {
         taskStatus = 
-          new MapTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
-                            "", "", trackerName, TaskStatus.Phase.MAP, 
-                            new Counters());
+          new MapTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.MAP),
+                            TaskStatus.State.RUNNING, "", "", trackerName, 
+                            TaskStatus.Phase.MAP, new Counters());
       } else {
         taskStatus = 
-          new ReduceTaskStatus(attemptId, 0.0f, TaskStatus.State.RUNNING, 
-                               "", "", trackerName, TaskStatus.Phase.REDUCE, 
-                               new Counters());
+          new ReduceTaskStatus(attemptId, 0.0f, job.getNumSlotsPerTask(TaskType.REDUCE), 
+                               TaskStatus.State.RUNNING, "", "", trackerName, 
+                               TaskStatus.Phase.REDUCE, new Counters());
       }
 
       // Set the start time
@@ -1067,10 +1075,13 @@ public class JobTracker implements MRCon
         synchronized (taskTrackers) {
           synchronized (trackerExpiryQueue) {
             // IV. Register a new tracker
-            boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+            TaskTracker taskTracker = getTaskTracker(trackerName);
+            boolean isTrackerRegistered =  (taskTracker != null);
             if (!isTrackerRegistered) {
               markTracker(trackerName); // add the tracker to recovery-manager
-              addNewTracker(ttStatus);
+              taskTracker = new TaskTracker(trackerName);
+              taskTracker.setStatus(ttStatus);
+              addNewTracker(taskTracker);
             }
       
             // V. Update the tracker status
@@ -1441,17 +1452,17 @@ public class JobTracker implements MRCon
         long now = System.currentTimeMillis();
         int size = trackerExpiryQueue.size();
         for (int i = 0; i < size ; ++i) {
-          // Get the first status
-          TaskTrackerStatus status = trackerExpiryQueue.first();
+          // Get the first tasktracker
+          TaskTrackerStatus taskTracker = trackerExpiryQueue.first();
 
           // Remove it
-          trackerExpiryQueue.remove(status);
+          trackerExpiryQueue.remove(taskTracker);
 
           // Set the new time
-          status.setLastSeen(now);
+          taskTracker.setLastSeen(now);
 
           // Add back to get the sorted list
-          trackerExpiryQueue.add(status);
+          trackerExpiryQueue.add(taskTracker);
         }
       }
 
@@ -1516,11 +1527,10 @@ public class JobTracker implements MRCon
   // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
   Map<TaskAttemptID, TaskInProgress> taskidToTIPMap =
     new TreeMap<TaskAttemptID, TaskInProgress>();
-  // (hostname --> Set(trackername))
   // This is used to keep track of all trackers running on one host. While
   // decommissioning the host, all the trackers on the host will be lost.
-  Map<String, Set<String>> hostnameToTrackerName = 
-    Collections.synchronizedMap(new TreeMap<String, Set<String>>());
+  Map<String, Set<TaskTracker>> hostnameToTaskTracker = 
+    Collections.synchronizedMap(new TreeMap<String, Set<TaskTracker>>());
   
 
   // (taskid --> trackerID) 
@@ -1557,8 +1567,8 @@ public class JobTracker implements MRCon
   //
   int totalMaps = 0;
   int totalReduces = 0;
-  private HashMap<String, TaskTrackerStatus> taskTrackers =
-    new HashMap<String, TaskTrackerStatus>();
+  private HashMap<String, TaskTracker> taskTrackers =
+    new HashMap<String, TaskTracker>();
   Map<String,Integer>uniqueHostsMap = new ConcurrentHashMap<String, Integer>();
   ExpireTrackers expireTrackers = new ExpireTrackers();
   Thread expireTrackersThread = null;
@@ -1573,7 +1583,7 @@ public class JobTracker implements MRCon
   RecoveryManager recoveryManager;
 
   /**
-   * It might seem like a bug to maintain a TreeSet of status objects,
+   * It might seem like a bug to maintain a TreeSet of tasktracker objects,
    * which can be updated at any time.  But that's not what happens!  We
    * only update status objects in the taskTrackers table.  Status objects
    * are never updated once they enter the expiry queue.  Instead, we wait
@@ -2382,9 +2392,15 @@ public class JobTracker implements MRCon
    * @return {@link Collection} of {@link TaskTrackerStatus} 
    */
   public Collection<TaskTrackerStatus> taskTrackers() {
+    Collection<TaskTrackerStatus> ttStatuses;
     synchronized (taskTrackers) {
-      return taskTrackers.values();
+      ttStatuses = 
+        new ArrayList<TaskTrackerStatus>(taskTrackers.values().size());
+      for (TaskTracker tt : taskTrackers.values()) {
+        ttStatuses.add(tt.getStatus());
+      }
     }
+    return ttStatuses;
   }
   
   /**
@@ -2396,7 +2412,8 @@ public class JobTracker implements MRCon
     Collection<TaskTrackerStatus> activeTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for ( TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus();
         if (!faultyTrackers.isBlacklisted(status.getHost())) {
           activeTrackers.add(status);
         }
@@ -2417,7 +2434,8 @@ public class JobTracker implements MRCon
     List<String> blacklistedTrackers = 
       new ArrayList<String>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus();
         if (!faultyTrackers.isBlacklisted(status.getHost())) {
           activeTrackers.add(status.getTrackerName());
         } else {
@@ -2440,7 +2458,8 @@ public class JobTracker implements MRCon
     Collection<TaskTrackerStatus> blacklistedTrackers = 
       new ArrayList<TaskTrackerStatus>();
     synchronized (taskTrackers) {
-      for (TaskTrackerStatus status : taskTrackers.values()) {
+      for (TaskTracker tt : taskTrackers.values()) {
+        TaskTrackerStatus status = tt.getStatus(); 
         if (faultyTrackers.isBlacklisted(status.getHost())) {
           blacklistedTrackers.add(status);
         }
@@ -2470,14 +2489,22 @@ public class JobTracker implements MRCon
    * @return true if blacklisted, false otherwise
    */
   public boolean isBlacklisted(String trackerID) {
-    TaskTrackerStatus status = getTaskTracker(trackerID);
+    TaskTrackerStatus status = getTaskTrackerStatus(trackerID);
     if (status != null) {
       return faultyTrackers.isBlacklisted(status.getHost());
     }
     return false;
   }
   
-  public TaskTrackerStatus getTaskTracker(String trackerID) {
+  public TaskTrackerStatus getTaskTrackerStatus(String trackerID) {
+    TaskTracker taskTracker;
+    synchronized (taskTrackers) {
+      taskTracker = taskTrackers.get(trackerID);
+    }
+    return (taskTracker == null) ? null : taskTracker.getStatus();
+  }
+
+  public TaskTracker getTaskTracker(String trackerID) {
     synchronized (taskTrackers) {
       return taskTrackers.get(trackerID);
     }
@@ -2491,7 +2518,8 @@ public class JobTracker implements MRCon
    * 
    * @param status Task Tracker's status
    */
-  private void addNewTracker(TaskTrackerStatus status) {
+  private void addNewTracker(TaskTracker taskTracker) {
+    TaskTrackerStatus status = taskTracker.getStatus();
     trackerExpiryQueue.add(status);
 
     //  Register the tracker if its not registered
@@ -2502,14 +2530,14 @@ public class JobTracker implements MRCon
     }
 
     // add it to the set of tracker per host
-    Set<String> trackers = hostnameToTrackerName.get(hostname);
+    Set<TaskTracker> trackers = hostnameToTaskTracker.get(hostname);
     if (trackers == null) {
-      trackers = Collections.synchronizedSet(new HashSet<String>());
-      hostnameToTrackerName.put(hostname, trackers);
+      trackers = Collections.synchronizedSet(new HashSet<TaskTracker>());
+      hostnameToTaskTracker.put(hostname, trackers);
     }
     LOG.info("Adding tracker " + status.getTrackerName() + " to host " 
              + hostname);
-    trackers.add(status.getTrackerName());
+    trackers.add(taskTracker);
   }
 
   public Node resolveAndAddToTopology(String name) {
@@ -2621,11 +2649,13 @@ public class JobTracker implements MRCon
                                                   boolean acceptNewTasks, 
                                                   short responseId) 
     throws IOException {
-    LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
-              " (restarted: " + restarted + 
-              " initialContact: " + initialContact + 
-              " acceptNewTasks: " + acceptNewTasks + ")" +
-              " with responseId: " + responseId);
+    if (LOG.isDebugEnabled()) {
+      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
+                " (restarted: " + restarted + 
+                " initialContact: " + initialContact + 
+                " acceptNewTasks: " + acceptNewTasks + ")" +
+                " with responseId: " + responseId);
+    }
 
     // Make sure heartbeat is from a tasktracker allowed by the jobtracker.
     if (!acceptTaskTracker(status)) {
@@ -2701,13 +2731,13 @@ public class JobTracker implements MRCon
       
     // Check for new tasks to be executed on the tasktracker
     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
-      TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
+      TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
       if (taskTrackerStatus == null) {
         LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
       } else {
         List<Task> tasks = getSetupAndCleanupTasks(taskTrackerStatus);
         if (tasks == null ) {
-          tasks = taskScheduler.assignTasks(taskTrackerStatus);
+          tasks = taskScheduler.assignTasks(taskTrackers.get(trackerName));
         }
         if (tasks != null) {
           for (Task task : tasks) {
@@ -2807,14 +2837,15 @@ public class JobTracker implements MRCon
    */
   private boolean updateTaskTrackerStatus(String trackerName,
                                           TaskTrackerStatus status) {
-    TaskTrackerStatus oldStatus = taskTrackers.get(trackerName);
+    TaskTracker tt = getTaskTracker(trackerName);
+    TaskTrackerStatus oldStatus = (tt == null) ? null : tt.getStatus();
     if (oldStatus != null) {
       totalMaps -= oldStatus.countMapTasks();
       totalReduces -= oldStatus.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
-        int mapSlots = oldStatus.getMaxMapTasks();
+        int mapSlots = oldStatus.getMaxMapSlots();
         totalMapTaskCapacity -= mapSlots;
-        int reduceSlots = oldStatus.getMaxReduceTasks();
+        int reduceSlots = oldStatus.getMaxReduceSlots();
         totalReduceTaskCapacity -= reduceSlots;
       }
       if (status == null) {
@@ -2834,16 +2865,56 @@ public class JobTracker implements MRCon
       totalMaps += status.countMapTasks();
       totalReduces += status.countReduceTasks();
       if (!faultyTrackers.isBlacklisted(status.getHost())) {
-        int mapSlots = status.getMaxMapTasks();
+        int mapSlots = status.getMaxMapSlots();
         totalMapTaskCapacity += mapSlots;
-        int reduceSlots = status.getMaxReduceTasks();
+        int reduceSlots = status.getMaxReduceSlots();
         totalReduceTaskCapacity += reduceSlots;
       }
       boolean alreadyPresent = false;
-      if (taskTrackers.containsKey(trackerName)) {
+      TaskTracker taskTracker = taskTrackers.get(trackerName);
+      if (taskTracker != null) {
         alreadyPresent = true;
+      } else {
+        taskTracker = new TaskTracker(trackerName);
+      }
+      
+      taskTracker.setStatus(status);
+      taskTrackers.put(trackerName, taskTracker);
+      
+      if (LOG.isDebugEnabled()) {
+        int runningMaps = 0, runningReduces = 0;
+        int commitPendingMaps = 0, commitPendingReduces = 0;
+        int unassignedMaps = 0, unassignedReduces = 0;
+        int miscMaps = 0, miscReduces = 0;
+        List<TaskStatus> taskReports = status.getTaskReports();
+        for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+          TaskStatus ts = (TaskStatus) it.next();
+          boolean isMap = ts.getIsMap();
+          TaskStatus.State state = ts.getRunState();
+          if (state == TaskStatus.State.RUNNING) {
+            if (isMap) { ++runningMaps; }
+            else { ++runningReduces; }
+          } else if (state == TaskStatus.State.UNASSIGNED) {
+            if (isMap) { ++unassignedMaps; }
+            else { ++unassignedReduces; }
+          } else if (state == TaskStatus.State.COMMIT_PENDING) {
+            if (isMap) { ++commitPendingMaps; }
+            else { ++commitPendingReduces; }
+          } else {
+            if (isMap) { ++miscMaps; } 
+            else { ++miscReduces; } 
+          }
+        }
+        LOG.debug(trackerName + ": Status -" +
+                  " running(m) = " + runningMaps + 
+                  " unassigned(m) = " + unassignedMaps + 
+                  " commit_pending(m) = " + commitPendingMaps +
+                  " misc(m) = " + miscMaps +
+                  " running(r) = " + runningReduces + 
+                  " unassigned(r) = " + unassignedReduces + 
+                  " commit_pending(r) = " + commitPendingReduces +
+                  " misc(r) = " + miscReduces); 
       }
-      taskTrackers.put(trackerName, status);
 
       if (!alreadyPresent)  {
         Integer numTaskTrackersInHost = 
@@ -2872,11 +2943,12 @@ public class JobTracker implements MRCon
       synchronized (trackerExpiryQueue) {
         boolean seenBefore = updateTaskTrackerStatus(trackerName,
                                                      trackerStatus);
+        TaskTracker taskTracker = getTaskTracker(trackerName);
         if (initialContact) {
           // If it's first contact, then clear out 
           // any state hanging around
           if (seenBefore) {
-            lostTaskTracker(trackerName);
+            lostTaskTracker(taskTracker);
           }
         } else {
           // If not first contact, there should be some record of the tracker
@@ -2893,7 +2965,7 @@ public class JobTracker implements MRCon
           if (isBlacklisted(trackerName)) {
             faultyTrackers.numBlacklistedTrackers += 1;
           }
-          addNewTracker(trackerStatus);
+          addNewTracker(taskTracker);
         }
       }
     }
@@ -3012,8 +3084,8 @@ public class JobTracker implements MRCon
   // returns cleanup tasks first, then setup tasks.
   private synchronized List<Task> getSetupAndCleanupTasks(
     TaskTrackerStatus taskTracker) throws IOException {
-    int maxMapTasks = taskTracker.getMaxMapTasks();
-    int maxReduceTasks = taskTracker.getMaxReduceTasks();
+    int maxMapTasks = taskTracker.getMaxMapSlots();
+    int maxReduceTasks = taskTracker.getMaxReduceSlots();
     int numMaps = taskTracker.countMapTasks();
     int numReduces = taskTracker.countReduceTasks();
     int numTaskTrackers = getClusterStatus().getTaskTrackers();
@@ -3810,7 +3882,8 @@ public class JobTracker implements MRCon
    * already been updated.  Just process the contained tasks and any
    * jobs that might be affected.
    */
-  void lostTaskTracker(String trackerName) {
+  void lostTaskTracker(TaskTracker taskTracker) {
+    String trackerName = taskTracker.getTrackerName();
     LOG.info("Lost tracker '" + trackerName + "'");
     
     // remove the tracker from the local structures
@@ -3868,10 +3941,14 @@ public class JobTracker implements MRCon
       
       // Penalize this tracker for each of the jobs which   
       // had any tasks running on it when it was 'lost' 
+      // Also, remove any reserved slots on this tasktracker
       for (JobInProgress job : jobsWithFailures) {
-        job.addTrackerTaskFailure(trackerName);
+        job.addTrackerTaskFailure(trackerName, taskTracker);
       }
-      
+
+      // Cleanup
+      taskTracker.lost();
+
       // Purge 'marked' tasks, needs to be done  
       // here to prevent hanging references!
       removeMarkedTasks(trackerName);
@@ -3901,9 +3978,9 @@ public class JobTracker implements MRCon
     hostsReader.refresh();
     
     Set<String> excludeSet = new HashSet<String>();
-    for(Map.Entry<String, TaskTrackerStatus> eSet : taskTrackers.entrySet()) {
+    for(Map.Entry<String, TaskTracker> eSet : taskTrackers.entrySet()) {
       String trackerName = eSet.getKey();
-      TaskTrackerStatus status = eSet.getValue();
+      TaskTrackerStatus status = eSet.getValue().getStatus();
       // Check if not include i.e not in host list or in hosts list but excluded
       if (!inHostsList(status) || inExcludedHostsList(status)) {
           excludeSet.add(status.getHost()); // add to rejected trackers
@@ -3921,12 +3998,13 @@ public class JobTracker implements MRCon
       synchronized (trackerExpiryQueue) {
         for (String host : hosts) {
           LOG.info("Decommissioning host " + host);
-          Set<String> trackers = hostnameToTrackerName.remove(host);
+          Set<TaskTracker> trackers = hostnameToTaskTracker.remove(host);
           if (trackers != null) {
-            for (String tracker : trackers) {
-              LOG.info("Losing tracker " + tracker + " on host " + host);
+            for (TaskTracker tracker : trackers) {
+              LOG.info("Decommission: Losing tracker " + tracker + 
+                       " on host " + host);
               lostTaskTracker(tracker); // lose the tracker
-              updateTaskTrackerStatus(tracker, null);
+              updateTaskTrackerStatus(tracker.getStatus().getTrackerName(), null);
             }
           }
           LOG.info("Host " + host + " is ready for decommissioning");

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LimitTasksPerJobTaskScheduler.java Fri Mar  4 03:25:10 2011
@@ -26,6 +26,7 @@ import java.util.List;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * A {@link TaskScheduler} that limits the maximum number of tasks
@@ -69,9 +70,9 @@ class LimitTasksPerJobTaskScheduler exte
   }
 
   @Override
-  public synchronized List<Task> assignTasks(TaskTrackerStatus taskTracker)
+  public synchronized List<Task> assignTasks(TaskTracker taskTracker)
       throws IOException {
-
+    TaskTrackerStatus taskTrackerStatus = taskTracker.getStatus();
     final int numTaskTrackers =
         taskTrackerManager.getClusterStatus().getTaskTrackers();
     Collection<JobInProgress> jobQueue =
@@ -79,10 +80,10 @@ class LimitTasksPerJobTaskScheduler exte
     Task task;
 
     /* Stats about the current taskTracker */
-    final int mapTasksNumber = taskTracker.countMapTasks();
-    final int reduceTasksNumber = taskTracker.countReduceTasks();
-    final int maximumMapTasksNumber = taskTracker.getMaxMapTasks();
-    final int maximumReduceTasksNumber = taskTracker.getMaxReduceTasks();
+    final int mapTasksNumber = taskTrackerStatus.countMapTasks();
+    final int reduceTasksNumber = taskTrackerStatus.countReduceTasks();
+    final int maximumMapTasksNumber = taskTrackerStatus.getMaxMapSlots();
+    final int maximumReduceTasksNumber = taskTrackerStatus.getMaxReduceSlots();
 
     /*
      * Statistics about the whole cluster. Most are approximate because of
@@ -141,11 +142,11 @@ class LimitTasksPerJobTaskScheduler exte
             continue;
           }
           if (step == 0 || step == 2) {
-            task = job.obtainNewMapTask(taskTracker, numTaskTrackers,
+            task = job.obtainNewMapTask(taskTrackerStatus, numTaskTrackers,
                 taskTrackerManager.getNumberOfUniqueHosts());
           }
           else {
-            task = job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+            task = job.obtainNewReduceTask(taskTrackerStatus, numTaskTrackers,
                 taskTrackerManager.getNumberOfUniqueHosts());
           }
           if (task != null) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/LocalJobRunner.java Fri Mar  4 03:25:10 2011
@@ -166,7 +166,8 @@ class LocalJobRunner implements JobSubmi
             MapTask map = new MapTask(file.toString(),  
                                       mapId, i,
                                       rawSplits[i].getClassName(),
-                                      rawSplits[i].getBytes(), job.getUser());
+                                      rawSplits[i].getBytes(), 1, 
+                                      job.getUser());
             JobConf localConf = new JobConf(job);
             map.setJobFile(localFile.toString());
             map.localizeConfiguration(localConf);
@@ -205,7 +206,8 @@ class LocalJobRunner implements JobSubmi
             }
             if (!this.isInterrupted()) {
               ReduceTask reduce = new ReduceTask(file.toString(), 
-                                                 reduceId, 0, mapIds.size(), job.getUser());
+                                                 reduceId, 0, mapIds.size(), 
+                                                 1, job.getUser());
               JobConf localConf = new JobConf(job);
               reduce.setJobFile(localFile.toString());
               reduce.localizeConfiguration(localConf);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTask.java Fri Mar  4 03:25:10 2011
@@ -90,8 +90,8 @@ class MapTask extends Task {
 
   public MapTask(String jobFile, TaskAttemptID taskId, 
                  int partition, String splitClass, BytesWritable split,
-                 String username) {
-    super(jobFile, taskId, partition, username);
+                 int numSlotsRequired, String username) {
+    super(jobFile, taskId, partition, numSlotsRequired, username);
     this.splitClass = splitClass;
     this.split = split;
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/MapTaskStatus.java Fri Mar  4 03:25:10 2011
@@ -23,10 +23,10 @@ class MapTaskStatus extends TaskStatus {
 
   public MapTaskStatus() {}
 
-  public MapTaskStatus(TaskAttemptID taskid, float progress,
+  public MapTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
           State runState, String diagnosticInfo, String stateString,
           String taskTracker, Phase phase, Counters counters) {
-    super(taskid, progress, runState, diagnosticInfo, stateString,
+    super(taskid, progress, numSlots, runState, diagnosticInfo, stateString,
           taskTracker, phase, counters);
   }
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 03:25:10 2011
@@ -150,8 +150,9 @@ class ReduceTask extends Task {
   }
 
   public ReduceTask(String jobFile, TaskAttemptID taskId,
-                    int partition, int numMaps, String username) {
-    super(jobFile, taskId, partition, username);
+                    int partition, int numMaps, int numSlotsRequired,
+                    String username) {
+    super(jobFile, taskId, partition, numSlotsRequired, username);
     this.numMaps = numMaps;
   }
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTaskStatus.java Fri Mar  4 03:25:10 2011
@@ -34,11 +34,11 @@ class ReduceTaskStatus extends TaskStatu
   
   public ReduceTaskStatus() {}
 
-  public ReduceTaskStatus(TaskAttemptID taskid, float progress, State runState,
-          String diagnosticInfo, String stateString, String taskTracker,
-          Phase phase, Counters counters) {
-    super(taskid, progress, runState, diagnosticInfo, stateString, taskTracker,
-            phase, counters);
+  public ReduceTaskStatus(TaskAttemptID taskid, float progress, int numSlots,
+                          State runState, String diagnosticInfo, String stateString, 
+                          String taskTracker, Phase phase, Counters counters) {
+    super(taskid, progress, numSlots, runState, diagnosticInfo, stateString, 
+          taskTracker, phase, counters);
   }
 
   @Override

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/Task.java Fri Mar  4 03:25:10 2011
@@ -52,8 +52,12 @@ import org.apache.hadoop.util.Progressab
 import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.hadoop.util.StringUtils;
 
-/** Base class for tasks. */
-abstract class Task implements Writable, Configurable {
+/** 
+ * Base class for tasks.
+ * 
+ * This is NOT a public interface.
+ */
+abstract public class Task implements Writable, Configurable {
   private static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskRunner");
 
@@ -140,6 +144,7 @@ abstract class Task implements Writable,
   protected final Counters.Counter spilledRecordsCounter;
   private String pidFile = "";
   protected TaskUmbilicalProtocol umbilical;
+  private int numSlotsRequired;
 
   ////////////////////////////////////////////
   // Constructors
@@ -151,14 +156,16 @@ abstract class Task implements Writable,
     spilledRecordsCounter = counters.findCounter(Counter.SPILLED_RECORDS);
   }
 
-  public Task(String jobFile, TaskAttemptID taskId, int partition, String username) {
+  public Task(String jobFile, TaskAttemptID taskId, int partition, 
+              int numSlotsRequired, String username) {
     this.username = username;
     this.jobFile = jobFile;
     this.taskId = taskId;
      
     this.partition = partition;
+    this.numSlotsRequired = numSlotsRequired;
     this.taskStatus = TaskStatus.createTaskStatus(isMapTask(), this.taskId, 
-                                                  0.0f, 
+                                                  0.0f, numSlotsRequired,
                                                   TaskStatus.State.UNASSIGNED, 
                                                   "", "", "", 
                                                   isMapTask() ? 
@@ -175,6 +182,10 @@ abstract class Task implements Writable,
   public void setJobFile(String jobFile) { this.jobFile = jobFile; }
   public String getJobFile() { return jobFile; }
   public TaskAttemptID getTaskID() { return taskId; }
+  public int getNumSlotsRequired() {
+    return numSlotsRequired;
+  }
+
   Counters getCounters() { return counters; }
   public void setPidFile(String pidFile) { 
     this.pidFile = pidFile; 
@@ -201,14 +212,14 @@ abstract class Task implements Writable,
   /**
    * Return current phase of the task. 
    * needs to be synchronized as communication thread sends the phase every second
-   * @return
+   * @return the curent phase of the task
    */
   public synchronized TaskStatus.Phase getPhase(){
     return this.taskStatus.getPhase(); 
   }
   /**
    * Set current phase of the task. 
-   * @param p
+   * @param phase task phase 
    */
   protected synchronized void setPhase(TaskStatus.Phase phase){
     this.taskStatus.setPhase(phase); 
@@ -331,6 +342,7 @@ abstract class Task implements Writable,
     Text.writeString(out, jobFile);
     taskId.write(out);
     out.writeInt(partition);
+    out.writeInt(numSlotsRequired);
     taskStatus.write(out);
     skipRanges.write(out);
     out.writeBoolean(skipping);
@@ -346,6 +358,7 @@ abstract class Task implements Writable,
     jobFile = Text.readString(in);
     taskId = TaskAttemptID.read(in);
     partition = in.readInt();
+    numSlotsRequired = in.readInt();
     taskStatus.readFields(in);
     this.mapOutputFile.setJobId(taskId.getJobID()); 
     skipRanges.readFields(in);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar  4 03:25:10 2011
@@ -67,6 +67,7 @@ class TaskInProgress {
   private JobTracker jobtracker;
   private TaskID id;
   private JobInProgress job;
+  private final int numSlotsRequired;
 
   // Status of the TIP
   private int successEventNumber = -1;
@@ -132,7 +133,8 @@ class TaskInProgress {
   public TaskInProgress(JobID jobid, String jobFile, 
                         RawSplit rawSplit, 
                         JobTracker jobtracker, JobConf conf, 
-                        JobInProgress job, int partition) {
+                        JobInProgress job, int partition,
+                        int numSlotsRequired) {
     this.jobFile = jobFile;
     this.rawSplit = rawSplit;
     this.jobtracker = jobtracker;
@@ -140,6 +142,7 @@ class TaskInProgress {
     this.conf = conf;
     this.partition = partition;
     this.maxSkipRecords = SkipBadRecords.getMapperMaxSkipRecords(conf);
+    this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -150,7 +153,7 @@ class TaskInProgress {
   public TaskInProgress(JobID jobid, String jobFile, 
                         int numMaps, 
                         int partition, JobTracker jobtracker, JobConf conf,
-                        JobInProgress job) {
+                        JobInProgress job, int numSlotsRequired) {
     this.jobFile = jobFile;
     this.numMaps = numMaps;
     this.partition = partition;
@@ -158,6 +161,7 @@ class TaskInProgress {
     this.job = job;
     this.conf = conf;
     this.maxSkipRecords = SkipBadRecords.getReducerMaxSkipGroups(conf);
+    this.numSlotsRequired = numSlotsRequired;
     setMaxTaskAttempts();
     init(jobid);
   }
@@ -523,7 +527,9 @@ class TaskInProgress {
            newState != TaskStatus.State.UNASSIGNED) && 
           (oldState == newState)) {
         LOG.warn("Recieved duplicate status update of '" + newState + 
-                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+                 "' for '" + taskid + "' of TIP '" + getTIPId() + "'" +
+                 "oldTT=" + oldStatus.getTaskTracker() + 
+                 " while newTT=" + status.getTaskTracker());
         return false;
       }
 
@@ -929,9 +935,10 @@ class TaskInProgress {
         split = new BytesWritable();
       }
       t = new MapTask(jobFile, taskid, partition, splitClass, split, 
-                      job.getUser());
+                      numSlotsRequired, job.getUser());
     } else {
-      t = new ReduceTask(jobFile, taskid, partition, numMaps, job.getUser());
+      t = new ReduceTask(jobFile, taskid, partition, numMaps, 
+                         numSlotsRequired, job.getUser());
     }
     if (jobCleanup) {
       t.setJobCleanupTask();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Mar  4 03:25:10 2011
@@ -23,6 +23,7 @@ import java.util.List;
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 
 /**
  * Used by a {@link JobTracker} to schedule {@link Task}s on
@@ -36,7 +37,7 @@ import org.apache.hadoop.conf.Configurat
  * between the job being added (when
  * {@link JobInProgressListener#jobAdded(JobInProgress)} is called)
  * and tasks for that job being assigned (by
- * {@link #assignTasks(TaskTrackerStatus)}).
+ * {@link #assignTasks(TaskTracker)}).
  * @see EagerTaskInitializationListener
  */
 abstract class TaskScheduler implements Configurable {
@@ -80,8 +81,8 @@ abstract class TaskScheduler implements 
    * @param taskTracker The TaskTracker for which we're looking for tasks.
    * @return A list of tasks to run on that TaskTracker, possibly empty.
    */
-  public abstract List<Task> assignTasks(TaskTrackerStatus taskTracker)
-    throws IOException;
+  public abstract List<Task> assignTasks(TaskTracker taskTracker)
+  throws IOException;
 
   /**
    * Returns a collection of jobs in an order which is specific to 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Fri Mar  4 03:25:10 2011
@@ -49,6 +49,7 @@ abstract class TaskStatus implements Wri
   private String diagnosticInfo;
   private String stateString;
   private String taskTracker;
+  private int numSlots;
     
   private long startTime; 
   private long finishTime; 
@@ -61,14 +62,16 @@ abstract class TaskStatus implements Wri
 
   public TaskStatus() {
     taskid = new TaskAttemptID();
+    numSlots = 0;
   }
 
-  public TaskStatus(TaskAttemptID taskid, float progress,
+  public TaskStatus(TaskAttemptID taskid, float progress, int numSlots,
                     State runState, String diagnosticInfo,
                     String stateString, String taskTracker,
                     Phase phase, Counters counters) {
     this.taskid = taskid;
     this.progress = progress;
+    this.numSlots = numSlots;
     this.runState = runState;
     this.diagnosticInfo = diagnosticInfo;
     this.stateString = stateString;
@@ -80,6 +83,10 @@ abstract class TaskStatus implements Wri
   
   public TaskAttemptID getTaskID() { return taskid; }
   public abstract boolean getIsMap();
+  public int getNumSlots() {
+    return numSlots;
+  }
+
   public float getProgress() { return progress; }
   public void setProgress(float progress) { this.progress = progress; } 
   public State getRunState() { return runState; }
@@ -358,6 +365,7 @@ abstract class TaskStatus implements Wri
   public void write(DataOutput out) throws IOException {
     taskid.write(out);
     out.writeFloat(progress);
+    out.writeInt(numSlots);
     WritableUtils.writeEnum(out, runState);
     Text.writeString(out, diagnosticInfo);
     Text.writeString(out, stateString);
@@ -375,6 +383,7 @@ abstract class TaskStatus implements Wri
   public void readFields(DataInput in) throws IOException {
     this.taskid.readFields(in);
     this.progress = in.readFloat();
+    this.numSlots = in.readInt();
     this.runState = WritableUtils.readEnum(in, State.class);
     this.diagnosticInfo = Text.readString(in);
     this.stateString = Text.readString(in);
@@ -394,24 +403,27 @@ abstract class TaskStatus implements Wri
   // Factory-like methods to create/read/write appropriate TaskStatus objects
   //////////////////////////////////////////////////////////////////////////////
   
-  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, float progress,
+  static TaskStatus createTaskStatus(DataInput in, TaskAttemptID taskId, 
+                                     float progress, int numSlots,
                                      State runState, String diagnosticInfo,
                                      String stateString, String taskTracker,
                                      Phase phase, Counters counters) 
   throws IOException {
     boolean isMap = in.readBoolean();
-    return createTaskStatus(isMap, taskId, progress, runState, diagnosticInfo, 
-                          stateString, taskTracker, phase, counters);
+    return createTaskStatus(isMap, taskId, progress, numSlots, runState, 
+                            diagnosticInfo, stateString, taskTracker, phase, 
+                            counters);
   }
   
-  static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, float progress,
-                                   State runState, String diagnosticInfo,
-                                   String stateString, String taskTracker,
-                                   Phase phase, Counters counters) { 
-    return (isMap) ? new MapTaskStatus(taskId, progress, runState, 
+  static TaskStatus createTaskStatus(boolean isMap, TaskAttemptID taskId, 
+                                     float progress, int numSlots,
+                                     State runState, String diagnosticInfo,
+                                     String stateString, String taskTracker,
+                                     Phase phase, Counters counters) { 
+    return (isMap) ? new MapTaskStatus(taskId, progress, numSlots, runState, 
                                        diagnosticInfo, stateString, taskTracker, 
                                        phase, counters) :
-                     new ReduceTaskStatus(taskId, progress, runState, 
+                     new ReduceTaskStatus(taskId, progress, numSlots, runState, 
                                           diagnosticInfo, stateString, 
                                           taskTracker, phase, counters);
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:25:10 2011
@@ -67,6 +67,7 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
 import org.apache.hadoop.mapred.pipes.Submitter;
+import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics.MetricsContext;
 import org.apache.hadoop.metrics.MetricsException;
 import org.apache.hadoop.metrics.MetricsRecord;
@@ -201,8 +202,8 @@ public class TaskTracker 
   private static final String OUTPUT = "output";
   private JobConf originalConf;
   private JobConf fConf;
-  private int maxCurrentMapTasks;
-  private int maxCurrentReduceTasks;
+  private int maxMapSlots;
+  private int maxReduceSlots;
   private int failures;
   private MapEventsFetcherThread mapEventsFetcher;
   int workerThreads;
@@ -501,8 +502,8 @@ public class TaskTracker 
     }
     
     // RPC initialization
-    int max = maxCurrentMapTasks > maxCurrentReduceTasks ? 
-                       maxCurrentMapTasks : maxCurrentReduceTasks;
+    int max = maxMapSlots > maxReduceSlots ? 
+                       maxMapSlots : maxReduceSlots;
     //set the num handlers to max*2 since canCommit may wait for the duration
     //of a heartbeat RPC
     this.taskReportServer =
@@ -540,8 +541,8 @@ public class TaskTracker 
 
     this.indexCache = new IndexCache(this.fConf);
 
-    mapLauncher = new TaskLauncher(maxCurrentMapTasks);
-    reduceLauncher = new TaskLauncher(maxCurrentReduceTasks);
+    mapLauncher = new TaskLauncher(TaskType.MAP, maxMapSlots);
+    reduceLauncher = new TaskLauncher(TaskType.REDUCE, maxReduceSlots);
     mapLauncher.start();
     reduceLauncher.start();
     Class<? extends TaskController> taskControllerClass 
@@ -919,9 +920,9 @@ public class TaskTracker 
    */
   public TaskTracker(JobConf conf) throws IOException {
     originalConf = conf;
-    maxCurrentMapTasks = conf.getInt(
+    maxMapSlots = conf.getInt(
                   "mapred.tasktracker.map.tasks.maximum", 2);
-    maxCurrentReduceTasks = conf.getInt(
+    maxReduceSlots = conf.getInt(
                   "mapred.tasktracker.reduce.tasks.maximum", 2);
     this.jobTrackAddr = JobTracker.getAddress(conf);
     String infoAddr = 
@@ -1195,8 +1196,8 @@ public class TaskTracker 
                                        cloneAndResetRunningTaskStatuses(
                                          sendCounters), 
                                        failures, 
-                                       maxCurrentMapTasks,
-                                       maxCurrentReduceTasks); 
+                                       maxMapSlots,
+                                       maxReduceSlots); 
       }
     } else {
       LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
@@ -1209,9 +1210,10 @@ public class TaskTracker 
     boolean askForNewTask;
     long localMinSpaceStart;
     synchronized (this) {
-      askForNewTask = (status.countMapTasks() < maxCurrentMapTasks || 
-                       status.countReduceTasks() < maxCurrentReduceTasks) &&
-                      acceptNewTasks; 
+      askForNewTask = 
+        ((status.countOccupiedMapSlots() < maxMapSlots || 
+          status.countOccupiedReduceSlots() < maxReduceSlots) && 
+         acceptNewTasks); 
       localMinSpaceStart = minSpaceStart;
     }
     if (askForNewTask) {
@@ -1585,12 +1587,12 @@ public class TaskTracker 
     private final int maxSlots;
     private List<TaskInProgress> tasksToLaunch;
 
-    public TaskLauncher(int numSlots) {
+    public TaskLauncher(TaskType taskType, int numSlots) {
       this.maxSlots = numSlots;
       this.numFreeSlots = new IntWritable(numSlots);
       this.tasksToLaunch = new LinkedList<TaskInProgress>();
       setDaemon(true);
-      setName("TaskLauncher for task");
+      setName("TaskLauncher for " + taskType + " tasks");
     }
 
     public void addToTaskQueue(LaunchTaskAction action) {
@@ -1605,9 +1607,9 @@ public class TaskTracker 
       tasksToLaunch.clear();
     }
     
-    public void addFreeSlot() {
+    public void addFreeSlots(int numSlots) {
       synchronized (numFreeSlots) {
-        numFreeSlots.set(numFreeSlots.get() + 1);
+        numFreeSlots.set(numFreeSlots.get() + numSlots);
         assert (numFreeSlots.get() <= maxSlots);
         LOG.info("addFreeSlot : current free slots : " + numFreeSlots.get());
         numFreeSlots.notifyAll();
@@ -1618,22 +1620,29 @@ public class TaskTracker 
       while (!Thread.interrupted()) {
         try {
           TaskInProgress tip;
+          Task task;
           synchronized (tasksToLaunch) {
             while (tasksToLaunch.isEmpty()) {
               tasksToLaunch.wait();
             }
             //get the TIP
             tip = tasksToLaunch.remove(0);
-            LOG.info("Trying to launch : " + tip.getTask().getTaskID());
+            task = tip.getTask();
+            LOG.info("Trying to launch : " + tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
           }
-          //wait for a slot to run
+          //wait for free slots to run
           synchronized (numFreeSlots) {
-            while (numFreeSlots.get() == 0) {
+            while (numFreeSlots.get() < task.getNumSlotsRequired()) {
+              LOG.info("TaskLauncher : Waiting for " + task.getNumSlotsRequired() + 
+                       " to launch " + task.getTaskID() + ", currently we have " + 
+                       numFreeSlots.get() + " free slots");
               numFreeSlots.wait();
             }
             LOG.info("In TaskLauncher, current free slots : " + numFreeSlots.get()+
-                " and trying to launch "+tip.getTask().getTaskID());
-            numFreeSlots.set(numFreeSlots.get() - 1);
+                     " and trying to launch "+tip.getTask().getTaskID() + 
+                     " which needs " + task.getNumSlotsRequired() + " slots");
+            numFreeSlots.set(numFreeSlots.get() - task.getNumSlotsRequired());
             assert (numFreeSlots.get() >= 0);
           }
           synchronized (tip) {
@@ -1642,7 +1651,7 @@ public class TaskTracker 
                 tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
                 tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
               //got killed externally while still in the launcher queue
-              addFreeSlot();
+              addFreeSlots(task.getNumSlotsRequired());
               continue;
             }
             tip.slotTaken = true;
@@ -1809,6 +1818,7 @@ public class TaskTracker 
       localJobConf = null;
       taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(), 
                                                0.0f, 
+                                               task.getNumSlotsRequired(),
                                                task.getState(),
                                                diagnosticInfo.toString(), 
                                                "initializing",  
@@ -2372,7 +2382,7 @@ public class TaskTracker 
     private synchronized void releaseSlot() {
       if (slotTaken) {
         if (launcher != null) {
-          launcher.addFreeSlot();
+          launcher.addFreeSlots(task.getNumSlotsRequired());
         }
         slotTaken = false;
       }
@@ -3014,11 +3024,11 @@ public class TaskTracker 
   }
 
   int getMaxCurrentMapTasks() {
-    return maxCurrentMapTasks;
+    return maxMapSlots;
   }
   
   int getMaxCurrentReduceTasks() {
-    return maxCurrentReduceTasks;
+    return maxReduceSlots;
   }
 
   /**
@@ -3111,7 +3121,7 @@ public class TaskTracker 
             JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY,
             JobConf.DISABLED_MEMORY_LIMIT);
     totalMemoryAllottedForTasks =
-        maxCurrentMapTasks * mapSlotMemorySizeOnTT + maxCurrentReduceTasks
+        maxMapSlots * mapSlotMemorySizeOnTT + maxReduceSlots
             * reduceSlotSizeMemoryOnTT;
     if (totalMemoryAllottedForTasks < 0) {
       //adding check for the old keys which might be used by the administrator

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar  4 03:25:10 2011
@@ -17,7 +17,10 @@
  */
 package org.apache.hadoop.mapred;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.io.*;
+import org.apache.hadoop.mapred.TaskStatus.State;
 
 import java.io.*;
 import java.util.*;
@@ -28,9 +31,11 @@ import java.util.*;
  * of the most recent TaskTrackerStatus objects for each
  * unique TaskTracker it knows about.
  *
+ * This is NOT a public interface!
  **************************************************/
-class TaskTrackerStatus implements Writable {
-
+public class TaskTrackerStatus implements Writable {
+  public static final Log LOG = LogFactory.getLog(TaskTrackerStatus.class);
+  
   static {                                        // register a ctor
     WritableFactories.setFactory
       (TaskTrackerStatus.class,
@@ -247,19 +252,27 @@ class TaskTrackerStatus implements Writa
   public List<TaskStatus> getTaskReports() {
     return taskReports;
   }
-    
+   
   /**
-   * Return the current MapTask count
+   * Is the given task considered as 'running' ?
+   * @param taskStatus
+   * @return
+   */
+  private boolean isTaskRunning(TaskStatus taskStatus) {
+    TaskStatus.State state = taskStatus.getRunState();
+    return (state == State.RUNNING || state == State.UNASSIGNED || 
+            taskStatus.inTaskCleanupPhase());
+  }
+  
+  /**
+   * Get the number of running map tasks.
+   * @return the number of running map tasks
    */
   public int countMapTasks() {
     int mapCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
       TaskStatus ts = (TaskStatus) it.next();
-      TaskStatus.State state = ts.getRunState();
-      if (ts.getIsMap() &&
-          ((state == TaskStatus.State.RUNNING) ||
-           (state == TaskStatus.State.UNASSIGNED) ||
-           ts.inTaskCleanupPhase())) {
+      if (ts.getIsMap() && isTaskRunning(ts)) {
         mapCount++;
       }
     }
@@ -267,17 +280,37 @@ class TaskTrackerStatus implements Writa
   }
 
   /**
-   * Return the current ReduceTask count
+   * Get the number of occupied map slots.
+   * @return the number of occupied map slots
+   */
+  public int countOccupiedMapSlots() {
+    int mapSlotsCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = (TaskStatus) it.next();
+      if (ts.getIsMap() && isTaskRunning(ts)) {
+        mapSlotsCount += ts.getNumSlots();
+      }
+    }
+    return mapSlotsCount;
+  }
+  
+  /**
+   * Get available map slots.
+   * @return available map slots
+   */
+  public int getAvailableMapSlots() {
+    return getMaxMapSlots() - countOccupiedMapSlots();
+  }
+  
+  /**
+   * Get the number of running reduce tasks.
+   * @return the number of running reduce tasks
    */
   public int countReduceTasks() {
     int reduceCount = 0;
-    for (Iterator it = taskReports.iterator(); it.hasNext();) {
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
       TaskStatus ts = (TaskStatus) it.next();
-      TaskStatus.State state = ts.getRunState();
-      if ((!ts.getIsMap()) &&
-          ((state == TaskStatus.State.RUNNING) ||  
-           (state == TaskStatus.State.UNASSIGNED) ||
-           ts.inTaskCleanupPhase())) {
+      if ((!ts.getIsMap()) && isTaskRunning(ts)) {
         reduceCount++;
       }
     }
@@ -285,6 +318,30 @@ class TaskTrackerStatus implements Writa
   }
 
   /**
+   * Get the number of occupied reduce slots.
+   * @return the number of occupied reduce slots
+   */
+  public int countOccupiedReduceSlots() {
+    int reduceSlotsCount = 0;
+    for (Iterator<TaskStatus> it = taskReports.iterator(); it.hasNext();) {
+      TaskStatus ts = (TaskStatus) it.next();
+      if ((!ts.getIsMap()) && isTaskRunning(ts)) {
+        reduceSlotsCount += ts.getNumSlots();
+      }
+    }
+    return reduceSlotsCount;
+  }
+  
+  /**
+   * Get available reduce slots.
+   * @return available reduce slots
+   */
+  public int getAvailableReduceSlots() {
+    return getMaxReduceSlots() - countOccupiedReduceSlots();
+  }
+  
+
+  /**
    */
   public long getLastSeen() {
     return lastSeen;
@@ -296,15 +353,18 @@ class TaskTrackerStatus implements Writa
   }
 
   /**
-   * Get the maximum concurrent tasks for this node.  (This applies
-   * per type of task - a node with maxTasks==1 will run up to 1 map
-   * and 1 reduce concurrently).
-   * @return maximum tasks this node supports
+   * Get the maximum map slots for this node.
+   * @return the maximum map slots for this node
    */
-  public int getMaxMapTasks() {
+  public int getMaxMapSlots() {
     return maxMapTasks;
   }
-  public int getMaxReduceTasks() {
+  
+  /**
+   * Get the maximum reduce slots for this node.
+   * @return the maximum reduce slots for this node
+   */
+  public int getMaxReduceSlots() {
     return maxReduceTasks;
   }  
   

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=1076949&r1=1076948&r2=1076949&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Fri Mar  4 03:25:10 2011
@@ -54,6 +54,7 @@ interface TaskUmbilicalProtocol extends 
    * Version 14 changed the getTask method signature for HADOOP-4232
    * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
    * Version 16 Added fatalError for child to communicate fatal errors to TT
+   * Version 16 Added numRequiredSlots to TaskStatus for MAPREDUCE-516
    * */
 
   public static final long versionID = 16L;

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java?rev=1076949&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapreduce/TaskType.java Fri Mar  4 03:25:10 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapreduce;
+
+/**
+ * Enum for map, reduce, job-setup, job-cleanup, task-cleanup task types.
+ */
+public enum TaskType {
+  MAP, REDUCE, JOB_SETUP, JOB_CLEANUP, TASK_CLEANUP
+}