You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/08 20:24:10 UTC

svn commit: r494158 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/

Author: cutting
Date: Mon Jan  8 11:24:08 2007
New Revision: 494158

URL: http://svn.apache.org/viewvc?view=rev&rev=494158
Log:
HADOOP-815.  Fix memory leaks in JobTracker.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jan  8 11:24:08 2007
@@ -9,6 +9,8 @@
  2. HADOOP-863.  Reduce logging verbosity introduced by HADOOP-813.
     (Devaraj Das via cutting)
 
+ 3. HADOOP-815.  Fix memory leaks in JobTracker. (Arun C Murthy via cutting)
+
 
 Release 0.10.0 - 2007-01-05
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jan  8 11:24:08 2007
@@ -454,40 +454,52 @@
                                            TaskStatus status,
                                            JobTrackerMetrics metrics) {
         String taskid = status.getTaskId();
+        
+        // Sanity check: is the TIP already complete?
         if (tip.isComplete()) {
           LOG.info("Already complete TIP " + tip.getTIPId() + 
-                   " has completed task " + taskid);
-          return;
-        } else {
-          LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
-                   " successfully.");          
-
-          String taskTrackerName = status.getTaskTracker();
+               " has completed task " + taskid);
+          
+          // Just mark this 'task' as complete
+          tip.completedTask(taskid);
           
-          if(status.getIsMap()){
-            JobHistory.MapAttempt.logStarted(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                taskTrackerName); 
-            JobHistory.MapAttempt.logFinished(profile.getJobId(), 
-                tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
-                taskTrackerName); 
-            JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                Values.MAP.name(), status.getFinishTime()); 
-          }else{
-              JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
-                  tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
-                  taskTrackerName); 
-              JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
-                  tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
-                  status.getSortFinishTime(), status.getFinishTime(), 
-                  taskTrackerName); 
-              JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
-                  Values.REDUCE.name(), status.getFinishTime()); 
+          // Let the JobTracker cleanup this taskid if the job isn't running
+          if (this.status.getRunState() != JobStatus.RUNNING) {
+            jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
           }
+          return;
+        } 
+
+        LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() + 
+          " successfully.");          
+
+        // Update jobhistory 
+        String taskTrackerName = status.getTaskTracker();
+        if(status.getIsMap()){
+          JobHistory.MapAttempt.logStarted(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+               taskTrackerName); 
+          JobHistory.MapAttempt.logFinished(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getFinishTime(), 
+               taskTrackerName); 
+          JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+               Values.MAP.name(), status.getFinishTime()); 
+        }else{
+          JobHistory.ReduceAttempt.logStarted(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getStartTime(), 
+               taskTrackerName); 
+          JobHistory.ReduceAttempt.logFinished(profile.getJobId(), 
+               tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
+               status.getSortFinishTime(), status.getFinishTime(), 
+               taskTrackerName); 
+          JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(), 
+               Values.REDUCE.name(), status.getFinishTime()); 
         }
         
+        // Mark the TIP as complete
         tip.completed(taskid);
-        // updating the running/finished map/reduce counts
+        
+        // Update the running/finished map/reduce counts
         if (tip.isMapTask()){
           runningMapTasks -= 1;
           finishedMapTasks += 1;
@@ -533,6 +545,10 @@
             JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
                 this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
             metrics.completeJob();
+        } else if (this.status.getRunState() != JobStatus.RUNNING) {
+            // The job has been killed/failed, 
+            // JobTracker should cleanup this task
+            jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
         }
     }
 
@@ -541,6 +557,7 @@
      */
     public synchronized void kill() {
         if (status.getRunState() != JobStatus.FAILED) {
+            LOG.info("Killing job '" + this.status.getJobId() + "'");
             this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
             this.finishTime = System.currentTimeMillis();
             this.runningMapTasks = 0;
@@ -575,7 +592,9 @@
     private void failedTask(TaskInProgress tip, String taskid, 
                             TaskStatus status, String trackerName,
                             boolean wasRunning, boolean wasComplete) {
+        // Mark the taskid as a 'failure'
         tip.failedSubTask(taskid, trackerName);
+        
         boolean isRunning = tip.isRunning();
         boolean isComplete = tip.isComplete();
         
@@ -622,6 +641,11 @@
         }
             
         //
+        // Let the JobTracker know that this task has failed
+        //
+        jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+
+        //
         // Check if we need to kill the job because of too many failures
         //
         if (tip.isFailed()) {
@@ -633,9 +657,7 @@
                 System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
             kill();
         }
-
-        jobtracker.removeTaskEntry(taskid);
- }
+    }
 
     /**
      * Fail a task with a given reason, but without a status object.
@@ -669,6 +691,9 @@
      * from the various tables.
      */
     synchronized void garbageCollect() {
+      // Let the JobTracker know that a job is complete
+      jobtracker.finalizeJob(this);
+      
       try {
         // Definitely remove the local-disk copy of the job file
         if (localJobFile != null) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jan  8 11:24:08 2007
@@ -47,6 +47,12 @@
     static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
 
     /**
+     * The maximum no. of 'completed' (successful/failed/killed)
+     * jobs kept in memory per-user. 
+     */
+    static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
+    
+    /**
      * Used for formatting the id numbers
      */
     private static NumberFormat idFormat = NumberFormat.getInstance();
@@ -289,10 +295,26 @@
                                 if (job.getStatus().getRunState() != JobStatus.RUNNING &&
                                     job.getStatus().getRunState() != JobStatus.PREP &&
                                     (job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
+                                    // Ok, this call to removeTaskEntries
+                                    // is dangerous in some very very obscure
+                                    // cases; e.g. when job completed, exceeded
+                                    // RETIRE_JOB_INTERVAL time-limit and yet
+                                    // some task (taskid) wasn't complete!
+                                    removeJobTasks(job);
+                                    
                                     it.remove();
-                            
+                                    synchronized (userToJobsMap) {
+                                        ArrayList<JobInProgress> userJobs =
+                                            userToJobsMap.get(job.getProfile().getUser());
+                                        synchronized (userJobs) {
+                                            userJobs.remove(job);
+                                        }
+                                    }
                                     jobInitQueue.remove(job);
                                     jobsByArrival.remove(job);
+                                    
+                                    LOG.info("Retired job with id: '" + 
+                                            job.getProfile().getJobId() + "'");
                                 }
                             }
                         }
@@ -418,6 +440,9 @@
     TreeMap jobs = new TreeMap();
     Vector jobsByArrival = new Vector();
 
+    // (user -> list of JobInProgress)
+    TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+    
     // All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
     Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
 
@@ -427,8 +452,12 @@
     // (trackerID->TreeSet of taskids running at that tracker)
     TreeMap trackerToTaskMap = new TreeMap();
 
-    // (trackerID --> last sent HeartBeatResponseID)
-    Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+    // (trackerID -> TreeSet of completed taskids running at that tracker)
+    TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+
+    // (trackerID --> last sent HeartBeatResponse)
+    Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap = 
+      new TreeMap();
     
     //
     // Watch and expire TaskTracker objects using these structures.
@@ -644,18 +673,181 @@
         // taskid --> TIP
         taskidToTIPMap.put(taskid, tip);
     }
+    
     void removeTaskEntry(String taskid) {
         // taskid --> tracker
         String tracker = (String) taskidToTrackerMap.remove(taskid);
 
         // tracker --> taskid
-        TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
-        if (trackerSet != null) {
-            trackerSet.remove(taskid);
+        if (tracker != null) {
+            TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+            if (trackerSet != null) {
+                trackerSet.remove(taskid);
+            }
         }
 
         // taskid --> TIP
         taskidToTIPMap.remove(taskid);
+        
+        LOG.debug("Removing task '" + taskid + "'");
+    }
+    
+    /**
+     * Mark a 'task' for removal later.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param taskTracker the tasktracker at which the 'task' was running
+     * @param taskid completed (success/failure/killed) task
+     */
+    void markCompletedTaskAttempt(String taskTracker, String taskid) {
+      // tracker --> taskid
+      TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+      if (taskset == null) {
+        taskset = new TreeSet();
+        trackerToMarkedTasksMap.put(taskTracker, taskset);
+      }
+      taskset.add(taskid);
+      
+      LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+    }
+
+    /**
+     * Mark all 'non-running' jobs of the job for pruning.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param job the completed job
+     */
+    void markCompletedJob(JobInProgress job) {
+      for (TaskInProgress tip : job.getMapTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
+                    taskStatus.getTaskId());
+          }
+        }
+      }
+      for (TaskInProgress tip : job.getReduceTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+            markCompletedTaskAttempt(taskStatus.getTaskTracker(), 
+                    taskStatus.getTaskId());
+          }
+        }
+      }
+    }
+    
+    /**
+     * Remove all 'marked' tasks running on a given {@link TaskTracker}
+     * from the {@link JobTracker}'s data-structures.
+     * This function assumes that the JobTracker is locked on entry.
+     * 
+     * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
+     */
+    private void removeMarkedTasks(String taskTracker) {
+      // Purge all the 'marked' tasks which were running at taskTracker
+      TreeSet<String> markedTaskSet = 
+        (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+      if (markedTaskSet != null) {
+        for (String taskid : markedTaskSet) {
+          removeTaskEntry(taskid);
+          LOG.info("Removed completed task '" + taskid + "' from '" + 
+                  taskTracker + "'");
+        }
+        // Clear 
+        trackerToMarkedTasksMap.remove(taskTracker);
+      }
+    }
+    
+    /**
+     * Call {@link #removeTaskEntry(String)} for each of the
+     * job's tasks.
+     * When the JobTracker is retiring the long-completed
+     * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
+     * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs 
+     * has been reached, we can afford to nuke all it's tasks; a little
+     * unsafe, but practically feasible. 
+     * 
+     * @param job the job about to be 'retired'
+     */
+    synchronized private void removeJobTasks(JobInProgress job) { 
+      for (TaskInProgress tip : job.getMapTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          removeTaskEntry(taskStatus.getTaskId());
+        }
+      }
+      for (TaskInProgress tip : job.getReduceTasks()) {
+        for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+          removeTaskEntry(taskStatus.getTaskId());
+        }
+      }
+    }
+    
+    /**
+     * Safe clean-up all data structures at the end of the 
+     * job (success/failure/killed).
+     * Here we also ensure that for a given user we maintain 
+     * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs 
+     * on the JobTracker.
+     *  
+     * @param job completed job.
+     */
+    synchronized void finalizeJob(JobInProgress job) {
+      // Mark the 'non-running' tasks for pruning
+      markCompletedJob(job);
+      
+      // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
+      // in memory; information about the purged jobs is available via
+      // JobHistory.
+      synchronized (jobs) {
+        synchronized (jobsByArrival) {
+          synchronized (jobInitQueue) {
+            String jobUser = job.getProfile().getUser();
+            synchronized (userToJobsMap) {
+              ArrayList<JobInProgress> userJobs = 
+                userToJobsMap.get(jobUser);
+              synchronized (userJobs) {
+                while (userJobs.size() > 
+                MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+                  JobInProgress rjob = userJobs.get(0);
+                  
+                  // Do not delete 'current'
+                  // finished job just yet.
+                  if (rjob == job) {
+                    break;
+                  }
+                  
+                  // Cleanup all datastructures
+                  int rjobRunState = 
+                    rjob.getStatus().getRunState();
+                  if (rjobRunState == JobStatus.SUCCEEDED || 
+                          rjobRunState == JobStatus.FAILED) {
+                    // Ok, this call to removeTaskEntries
+                    // is dangerous is some very very obscure
+                    // cases; e.g. when rjob completed, hit
+                    // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+                    // limit and yet some task (taskid)
+                    // wasn't complete!
+                    removeJobTasks(rjob);
+                    
+                    userJobs.remove(0);
+                    jobs.remove(rjob.getProfile().getJobId());
+                    jobInitQueue.remove(rjob);
+                    jobsByArrival.remove(rjob);
+                    
+                    LOG.info("Retired job with id: '" + 
+                            rjob.getProfile().getJobId() + "'");
+                  } else {
+                    // Do not remove jobs that aren't complete.
+                    // Stop here, and let the next pass take
+                    // care of purging jobs.
+                    break;
+                  }
+                }
+              }
+            }
+          }
+        }
+      }
     }
 
     ///////////////////////////////////////////////////////
@@ -736,26 +928,46 @@
     public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status, 
             boolean initialContact, boolean acceptNewTasks, short responseId) 
     throws IOException {
-      LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
+        LOG.debug("Got heartbeat from: " + status.getTrackerName() + 
               " (initialContact: " + initialContact + 
               " acceptNewTasks: " + acceptNewTasks + ")" +
               " with responseId: " + responseId);
       
         // First check if the last heartbeat response got through 
         String trackerName = status.getTrackerName();
-        Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
-      
-        short newResponseId = (short)(responseId + 1);
-        if (!initialContact && oldResponseId != null && 
-                oldResponseId.shortValue() != responseId) {
-            newResponseId = oldResponseId.shortValue();
+        HeartbeatResponse prevHeartbeatResponse =
+            trackerToHeartbeatResponseMap.get(trackerName);
+
+        if (initialContact != true) {
+            // If this isn't the 'initial contact' from the tasktracker,
+            // there is something seriously wrong if the JobTracker has
+            // no record of the 'previous heartbeat'; if so, ask the 
+            // tasktracker to re-initialize itself.
+            if (prevHeartbeatResponse == null) {
+                LOG.warn("Serious problem, cannot find record of 'previous' " +
+                    "heartbeat for '" + trackerName + 
+                    "'; reinitializing the tasktracker");
+                return new HeartbeatResponse(responseId, 
+                        new TaskTrackerAction[] {new ReinitTrackerAction()});
+
+            }
+                
+            // It is completely safe to ignore a 'duplicate' from a tracker
+            // since we are guaranteed that the tracker sends the same 
+            // 'heartbeat' when rpcs are lost. 
+            // {@see TaskTracker.transmitHeartbeat()}
+            if (prevHeartbeatResponse.getResponseId() != responseId) {
+                LOG.info("Ignoring 'duplicate' heartbeat from '" + 
+                        trackerName + "'");
+                return prevHeartbeatResponse;
+            }
         }
       
         // Process this heartbeat 
-        if (!processHeartbeat(status, initialContact, 
-                (newResponseId != responseId))) {
-            if (oldResponseId != null) {
-                trackerToHeartbeatResponseIDMap.remove(trackerName);
+        short newResponseId = (short)(responseId + 1);
+        if (!processHeartbeat(status, initialContact)) {
+            if (prevHeartbeatResponse != null) {
+                trackerToHeartbeatResponseMap.remove(trackerName);
             }
 
             return new HeartbeatResponse(newResponseId, 
@@ -784,12 +996,12 @@
         response.setActions(
                 actions.toArray(new TaskTrackerAction[actions.size()]));
         
-        // Update the trackerToHeartbeatResponseIDMap
-        if (newResponseId != responseId) {
-            trackerToHeartbeatResponseIDMap.put(trackerName, 
-                    new Short(newResponseId));
-        }
+        // Update the trackerToHeartbeatResponseMap
+        trackerToHeartbeatResponseMap.put(trackerName, response);
 
+        // Done processing the hearbeat, now remove 'marked' tasks
+        removeMarkedTasks(trackerName);
+        
         return response;
     }
     
@@ -824,12 +1036,9 @@
      * Process incoming heartbeat messages from the task trackers.
      */
     private synchronized boolean processHeartbeat(
-            TaskTrackerStatus trackerStatus, 
-            boolean initialContact, boolean updateStatusTimestamp) {
+            TaskTrackerStatus trackerStatus, boolean initialContact) {
         String trackerName = trackerStatus.getTrackerName();
-        if (initialContact || updateStatusTimestamp) {
-          trackerStatus.setLastSeen(System.currentTimeMillis());
-        }
+        trackerStatus.setLastSeen(System.currentTimeMillis());
 
         synchronized (taskTrackers) {
             synchronized (trackerExpiryQueue) {
@@ -857,7 +1066,7 @@
         }
 
         updateTaskStatuses(trackerStatus);
-        //LOG.info("Got heartbeat from "+trackerName);
+
         return true;
     }
 
@@ -1028,7 +1237,6 @@
                         killList.add(new KillTaskAction(killTaskId));
                         LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
                     } else {
-                      //killTasksList.add(new KillJobAction(taskId));
                         String killJobId = tip.getJob().getStatus().getJobId(); 
                         killJobIds.add(killJobId);
                     }
@@ -1051,14 +1259,28 @@
      * map task outputs.
      */
     public synchronized MapOutputLocation[] 
-             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
-        ArrayList result = new ArrayList(mapTasksNeeded.length);
+             locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) 
+    throws IOException {
+        // Check to make sure that the job hasn't 'completed'.
         JobInProgress job = getJob(jobId);
+        if (job.status.getRunState() != JobStatus.RUNNING) {
+          return new MapOutputLocation[0];
+        }
+        
+        ArrayList result = new ArrayList(mapTasksNeeded.length);
         for (int i = 0; i < mapTasksNeeded.length; i++) {
           TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
           if (status != null) {
              String trackerId = 
                (String) taskidToTrackerMap.get(status.getTaskId());
+             // Safety check, if we can't find the taskid in 
+             // taskidToTrackerMap and job isn't 'running', then just
+             // return an empty array
+             if (trackerId == null && 
+                     job.status.getRunState() != JobStatus.RUNNING) {
+               return new MapOutputLocation[0];
+             }
+             
              TaskTrackerStatus tracker;
              synchronized (taskTrackers) {
                tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
@@ -1108,10 +1330,22 @@
         synchronized (jobs) {
             synchronized (jobsByArrival) {
                 synchronized (jobInitQueue) {
-                    jobs.put(job.getProfile().getJobId(), job);
-                    jobsByArrival.add(job);
-                    jobInitQueue.add(job);
-                    jobInitQueue.notifyAll();
+                    synchronized (userToJobsMap) {
+                        jobs.put(job.getProfile().getJobId(), job);
+                        String jobUser = job.getProfile().getUser();
+                        if (!userToJobsMap.containsKey(jobUser)) {
+                            userToJobsMap.put(jobUser, 
+                                    new ArrayList<JobInProgress>());
+                        }
+                        ArrayList<JobInProgress> userJobs = 
+                            userToJobsMap.get(jobUser);
+                        synchronized (userJobs) {
+                            userJobs.add(job);
+                        }
+                        jobsByArrival.add(job);
+                        jobInitQueue.add(job);
+                        jobInitQueue.notifyAll();
+                    }
                 }
             }
         }
@@ -1271,8 +1505,7 @@
      * jobs that might be affected.
      */
     void updateTaskStatuses(TaskTrackerStatus status) {
-        for (Iterator it = status.taskReports(); it.hasNext(); ) {
-            TaskStatus report = (TaskStatus) it.next();
+        for (TaskStatus report : status.getTaskReports()) {
             report.setTaskTracker(status.getTrackerName());
             String taskId = report.getTaskId();
             TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
@@ -1310,8 +1543,16 @@
                                    TaskStatus.Phase.MAP, hostname, trackerName, 
                                    myMetrics);
                   }
+                } else if (!tip.isMapTask() && tip.isComplete()) {
+                  // Completed 'reduce' task, not failed;
+                  // only removed from data-structures.
+                  markCompletedTaskAttempt(trackerName, taskId);
                 }
             }
+            
+            // Purge 'marked' tasks, needs to be done  
+            // here to prevent hanging references!
+            removeMarkedTasks(trackerName);
         }
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jan  8 11:24:08 2007
@@ -57,7 +57,6 @@
     private int partition;
     private JobTracker jobtracker;
     private String id;
-    private String totalTaskIds[];
     private JobInProgress job;
 
     // Status of the TIP
@@ -70,7 +69,13 @@
     private int completes = 0;
     private boolean failed = false;
     private boolean killed = false;
-    private TreeSet usableTaskIds = new TreeSet();
+
+    // The 'unique' prefix for taskids of this tip
+    String taskIdPrefix;
+    
+    // The 'next' usable taskid of this tip
+    int nextTaskId = 0;
+    
     // Map from task Id -> TaskTracker Id, contains tasks that are
     // currently runnings
     private TreeMap<String, String> activeTasks = new TreeMap();
@@ -139,13 +144,8 @@
     void init(String jobUniqueString) {
         this.startTime = System.currentTimeMillis();
         this.runSpeculative = conf.getSpeculativeExecution();
-        String uniqueString = makeUniqueString(jobUniqueString);
-        this.id = "tip_" + uniqueString;
-        this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
-        for (int i = 0; i < totalTaskIds.length; i++) {
-          totalTaskIds[i] = "task_" + uniqueString + "_" + i;
-          usableTaskIds.add(totalTaskIds[i]);
-        }
+        this.taskIdPrefix = makeUniqueString(jobUniqueString);
+        this.id = "tip_" + this.taskIdPrefix;
     }
 
     ////////////////////////////////////
@@ -180,11 +180,19 @@
     }
     
     /**
+     * Is this tip complete?
+     * 
+     * @return <code>true</code> if the tip is complete, else <code>false</code>
      */
     public boolean isComplete() {
         return (completes > 0);
     }
+
     /**
+     * Is the given taskid in this tip complete?
+     * 
+     * @param taskid taskid of attempt to check for completion
+     * @return <code>true</code> if taskid is complete, else <code>false</code>
      */
     public boolean isComplete(String taskid) {
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
@@ -194,7 +202,11 @@
         return ((completes > 0) && 
                 (status.getRunState() == TaskStatus.State.SUCCEEDED));
     }
+
     /**
+     * Is the tip a failure?
+     * 
+     * @return <code>true</code> if tip has failed, else <code>false</code>
      */
     public boolean isFailed() {
         return failed;
@@ -293,6 +305,17 @@
           TaskStatus.State oldState = oldStatus.getRunState();
           TaskStatus.State newState = status.getRunState();
           
+          // We should never recieve a duplicate success/failure/killed
+          // status update for the same taskid! This is a safety check, 
+          // and is addressed better at the TaskTracker to ensure this.
+          // @see {@link TaskTracker.transmitHeartbeat()}
+          if ((newState != TaskStatus.State.RUNNING) && 
+                  (oldState == newState)) {
+              LOG.warn("Recieved duplicate status update of '" + newState + 
+                      "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+              return false;
+          }
+
           // The task is not allowed to move from completed back to running.
           // We have seen out of order status messagesmoving tasks from complete
           // to running. This is a spot fix, but it should be addressed more
@@ -346,14 +369,29 @@
 
     /**
      * Indicate that one of the taskids in this TaskInProgress
-     * has successfully completed!
+     * has successfully completed. 
+     * 
+     * However this may not be the first subtask in this 
+     * TaskInProgress to be completed and hence we might not want to 
+     * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
      */
-    public void completed(String taskid) {
+    void completedTask(String taskid) {
         LOG.info("Task '" + taskid + "' has completed.");
         TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
         status.setRunState(TaskStatus.State.SUCCEEDED);
         activeTasks.remove(taskid);
-
+    }
+    
+    /**
+     * Indicate that one of the taskids in this TaskInProgress
+     * has successfully completed!
+     */
+    public void completed(String taskid) {
+        //
+        // Record that this taskid is complete
+        //
+        completedTask(taskid);
+        
         //
         // Now that the TIP is complete, the other speculative 
         // subtasks will be closed when the owning tasktracker 
@@ -470,8 +508,17 @@
           execStartTime = System.currentTimeMillis();
         }
 
-        String taskid = (String) usableTaskIds.first();
-        usableTaskIds.remove(taskid);
+        // Create the 'taskid'
+        String taskid = null;
+        if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
+          taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
+          ++nextTaskId;
+        } else {
+          LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) + 
+                  " attempts for the tip '" + getTIPId() + "'");
+          return null;
+        }
+        
         String jobId = job.getProfile().getJobId();
 
         if (isMapTask()) {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jan  8 11:24:08 2007
@@ -74,6 +74,16 @@
     // last heartbeat response recieved
     short heartbeatResponseId = -1;
 
+    /*
+     * This is the last 'status' report sent by this tracker to the JobTracker.
+     * 
+     * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+     * indicating that a 'fresh' status report be generated; in the event the
+     * rpc calls fails for whatever reason, the previous status report is sent
+     * again.
+     */
+    TaskTrackerStatus status = null;
+    
     StatusHttpServer server = null;
     
     boolean shuttingDown = false;
@@ -249,6 +259,7 @@
         this.mapTotal = 0;
         this.reduceTotal = 0;
         this.acceptNewTasks = true;
+        this.status = null;
         
         this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
         this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
@@ -535,20 +546,27 @@
      * @throws IOException
      */
     private HeartbeatResponse transmitHeartBeat() throws IOException {
+      // 
+      // Check if the last heartbeat got through... 
+      // if so then build the heartbeat information for the JobTracker;
+      // else resend the previous status information.
       //
-      // Build the heartbeat information for the JobTracker
-      //
-      List<TaskStatus> taskReports = 
-        new ArrayList<TaskStatus>(runningTasks.size());
-      synchronized (this) {
-        for (TaskInProgress tip: runningTasks.values()) {
-          taskReports.add(tip.createStatus());
+      if (status == null) {
+        List<TaskStatus> taskReports = 
+          new ArrayList<TaskStatus>(runningTasks.size());
+        synchronized (this) {
+          for (TaskInProgress tip: runningTasks.values()) {
+            taskReports.add(tip.createStatus());
+          }
         }
+        status = 
+          new TaskTrackerStatus(taskTrackerName, localHostname, 
+                  httpPort, taskReports, 
+                  failures); 
+      } else {
+        LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+                "' with reponseId '" + heartbeatResponseId);
       }
-      TaskTrackerStatus status = 
-        new TaskTrackerStatus(taskTrackerName, localHostname, 
-                httpPort, taskReports, 
-                failures); 
       
       //
       // Check if we should ask for a new Task
@@ -569,10 +587,14 @@
       HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status, 
               justStarted, askForNewTask, 
               heartbeatResponseId);
+      
+      //
+      // The heartbeat got through successfully!
+      //
       heartbeatResponseId = heartbeatResponse.getResponseId();
       
       synchronized (this) {
-        for (TaskStatus taskStatus : taskReports) {
+        for (TaskStatus taskStatus : status.getTaskReports()) {
           if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
             if (taskStatus.getIsMap()) {
               mapTotal--;
@@ -584,6 +606,10 @@
           }
         }
       }
+
+      // Force a rebuild of 'status' on the next iteration
+      status = null;                                
+
       return heartbeatResponse;
     }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Jan  8 11:24:08 2007
@@ -98,11 +98,24 @@
      * All current tasks at the TaskTracker.  
      *
      * Tasks are tracked by a TaskStatus object.
+     * 
+     * @deprecated use {@link #getTaskReports()} instead
      */
     public Iterator taskReports() {
         return taskReports.iterator();
     }
 
+    /**
+     * Get the current tasks at the TaskTracker.
+     * Tasks are tracked by a {@link TaskStatus} object.
+     * 
+     * @return a list of {@link TaskStatus} representing 
+     *         the current tasks at the TaskTracker.
+     */
+    public List<TaskStatus> getTaskReports() {
+      return taskReports;
+    }
+    
     /**
      * Return the current MapTask count
      */