You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/10/24 23:18:35 UTC

svn commit: r467488 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/ipc/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/util/

Author: cutting
Date: Tue Oct 24 14:18:34 2006
New Revision: 467488

URL: http://svn.apache.org/viewvc?view=rev&rev=467488
Log:
HADOOP-610.  Fix TaskTracker to survive more exceptions, keeping tasks from becoming lost.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 24 14:18:34 2006
@@ -31,6 +31,9 @@
     locally, if possible.  This was the intent, but there as a bug.
     (Dhruba Borthakur via cutting) 
 
+ 9. HADOOP-610.  Fix TaskTracker to survive more exceptions, keeping
+    tasks from becoming lost.  (omalley via cutting)
+
 
 Release 0.7.2 - 2006-10-18
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/ipc/Server.java Tue Oct 24 14:18:34 2006
@@ -47,6 +47,7 @@
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.io.WritableUtils;
 import org.apache.hadoop.ipc.SocketChannelOutputStream;
+import org.apache.hadoop.util.*;
 
 /** An abstract IPC service.  IPC calls take a single {@link Writable} as a
  * parameter, and return a {@link Writable} as their value.  A service runs on
@@ -497,6 +498,7 @@
           // throw the message away if it is too old
           if (System.currentTimeMillis() - call.receivedTime > 
               maxCallStartAge) {
+            ReflectionUtils.logThreadInfo(LOG, "Discarding call " + call, 30);
             LOG.warn("Call " + call.toString() + 
                      " discarded for being too old (" +
                      (System.currentTimeMillis() - call.receivedTime) + ")");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/IsolationRunner.java Tue Oct 24 14:18:34 2006
@@ -53,7 +53,7 @@
     }
 
     public void progress(String taskid, float progress, String state,
-                         Phase phase) throws IOException {
+                         TaskStatus.Phase phase) throws IOException {
       StringBuffer buf = new StringBuffer("Task ");
       buf.append(taskid);
       buf.append(" making progress to ");

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Oct 24 14:18:34 2006
@@ -657,7 +657,7 @@
      * @param trackerName The task tracker the task failed on
      */
     public void failedTask(TaskInProgress tip, String taskid, 
-                           String reason, Phase phase, 
+                           String reason, TaskStatus.Phase phase, 
                            String hostname, String trackerName,
                            JobTrackerMetrics metrics) {
        TaskStatus status = new TaskStatus(taskid,

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 24 14:18:34 2006
@@ -69,7 +69,8 @@
           tracker = new JobTracker(conf);
           break;
         } catch (IOException e) {
-          LOG.warn("Starting tracker", e);
+          LOG.warn("Error starting tracker: " + 
+                   StringUtils.stringifyException(e));
         }
         try {
           Thread.sleep(1000);
@@ -139,7 +140,8 @@
                       TaskTrackerStatus trackerStatus = 
                         getTaskTracker(trackerName);
                       job.failedTask(tip, taskId, "Error launching task", 
-                                     tip.isMapTask()?Phase.MAP:Phase.STARTING,
+                                     tip.isMapTask()? TaskStatus.Phase.MAP:
+                                       TaskStatus.Phase.STARTING,
                                      trackerStatus.getHost(), trackerName,
                                      myMetrics);
                     }
@@ -1209,7 +1211,8 @@
                   // if the job is done, we don't want to change anything
                   if (job.getStatus().getRunState() == JobStatus.RUNNING) {
                     job.failedTask(tip, taskId, "Lost task tracker", 
-                                   Phase.MAP, hostname, trackerName, myMetrics);
+                                   TaskStatus.Phase.MAP, hostname, trackerName, 
+                                   myMetrics);
                   }
                 }
             }

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/LocalJobRunner.java Tue Oct 24 14:18:34 2006
@@ -157,7 +157,8 @@
 
     public Task getTask(String taskid) { return null; }
 
-    public void progress(String taskId, float progress, String state, Phase phase) {
+    public void progress(String taskId, float progress, String state, 
+                         TaskStatus.Phase phase) {
       LOG.info(state);
       float taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/MapTask.java Tue Oct 24 14:18:34 2006
@@ -46,7 +46,7 @@
   }
 
   {   // set phase for this task
-    setPhase(Phase.MAP); 
+    setPhase(TaskStatus.Phase.MAP); 
   }
   
   private class MapTaskMetrics {

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Tue Oct 24 14:18:34 2006
@@ -67,7 +67,7 @@
 
   { 
     getProgress().setStatus("reduce"); 
-    setPhase(Phase.SHUFFLE);        // phase to start with 
+    setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
  }
 
   private Progress copyPhase = getProgress().addPhase("copy");
@@ -236,7 +236,7 @@
     WritableComparator comparator = job.getOutputKeyComparator();
     
     try {
-      setPhase(Phase.SORT) ; 
+      setPhase(TaskStatus.Phase.SORT) ; 
       sortProgress.start();
 
       // sort the input file
@@ -249,7 +249,7 @@
     }
 
     sortPhase.complete();                         // sort is complete
-    setPhase(Phase.REDUCE); 
+    setPhase(TaskStatus.Phase.REDUCE); 
 
     Reporter reporter = getReporter(umbilical, getProgress());
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Tue Oct 24 14:18:34 2006
@@ -37,7 +37,7 @@
   private String taskId;                          // unique, includes job id
   private String jobId;                           // unique jobid
   private int partition;                          // id within job
-  private Phase phase ;                         // current phase of the task 
+  private TaskStatus.Phase phase ;                         // current phase of the task 
 
   ////////////////////////////////////////////
   // Constructors
@@ -78,14 +78,14 @@
    * Return current phase of the task. 
    * @return
    */
-  public Phase getPhase(){
+  public TaskStatus.Phase getPhase(){
     return this.phase ; 
   }
   /**
    * Set current phase of the task. 
    * @param p
    */
-  protected void setPhase(Phase p){
+  protected void setPhase(TaskStatus.Phase p){
     this.phase = p ; 
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Tue Oct 24 14:18:34 2006
@@ -18,9 +18,6 @@
 import org.apache.hadoop.io.*;
 
 import java.io.*;
-// enumeration for reporting current phase of a task. 
-enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
-
 /**************************************************
  * Describes the current status of a task.  This is
  * not intended to be a comprehensive piece of data.
@@ -28,7 +25,11 @@
  * @author Mike Cafarella
  **************************************************/
 class TaskStatus implements Writable {
-    public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED};
+    //enumeration for reporting current phase of a task. 
+    public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
+
+    // what state is the task in?
+    public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
     
     private String taskid;
     private boolean isMap;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Tue Oct 24 14:18:34 2006
@@ -50,7 +50,7 @@
     private long taskTimeout; 
     private int httpPort;
 
-    static final int STALE_STATE = 1;
+    static enum State {NORMAL, STALE, INTERRUPTED}
 
     public static final Log LOG =
     LogFactory.getLog("org.apache.hadoop.mapred.TaskTracker");
@@ -71,12 +71,12 @@
     
     boolean shuttingDown = false;
     
-    TreeMap tasks = null;
+    Map<String, TaskInProgress> tasks = null;
     /**
      * Map from taskId -> TaskInProgress.
      */
-    TreeMap runningTasks = null;
-    Map runningJobs = null;
+    Map<String, TaskInProgress> runningTasks = null;
+    Map<String, RunningJob> runningJobs = null;
     int mapTotal = 0;
     int reduceTotal = 0;
     boolean justStarted = true;
@@ -151,6 +151,41 @@
       taskCleanupThread.start();
     }
     
+    private RunningJob addTaskToJob(String jobId, 
+                                    Path localJobFile,
+                                    TaskInProgress tip) {
+      synchronized (runningJobs) {
+        RunningJob rJob = null;
+        if (!runningJobs.containsKey(jobId)) {
+          rJob = new RunningJob(localJobFile);
+          rJob.localized = false;
+          rJob.tasks = new HashSet();
+          rJob.jobFile = localJobFile;
+          runningJobs.put(jobId, rJob);
+        } else {
+          rJob = runningJobs.get(jobId);
+        }
+        rJob.tasks.add(tip);
+        return rJob;
+      }
+    }
+
+    private void removeTaskFromJob(String jobId, TaskInProgress tip) {
+      synchronized (runningJobs) {
+        RunningJob rjob = runningJobs.get(jobId);
+        if (rjob == null) {
+          LOG.warn("Unknown job " + jobId + " being deleted.");
+        } else {
+          synchronized (rjob) {
+            rjob.tasks.remove(tip);
+            if (rjob.tasks.isEmpty()) {
+              runningJobs.remove(jobId);
+            }
+          }
+        }
+      }
+    }
+    
     static String getCacheSubdir() {
       return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
     }
@@ -229,30 +264,14 @@
     private void localizeJob(TaskInProgress tip) throws IOException {
       Path localJarFile = null;
       Task t = tip.getTask();
-      Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
-          .getJobId()
-          + Path.SEPARATOR + "job.xml"));
-      RunningJob rjob = null;
-      synchronized (runningJobs) {
-        if (!runningJobs.containsKey(t.getJobId())) {
-          rjob = new RunningJob();
-          rjob.localized = false;
-          rjob.tasks = new ArrayList();
-          rjob.jobFile = localJobFile;
-          rjob.tasks.add(tip);
-          runningJobs.put(t.getJobId(), rjob);
-        } else {
-          rjob = (RunningJob) runningJobs.get(t.getJobId());
-          // keep this for later use when we just get a jobid to delete
-          // the data for
-          rjob.tasks.add(tip);
-        }
-      }
+      String jobId = t.getJobId();
+      Path localJobFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
+                                   jobId + Path.SEPARATOR + "job.xml");
+      RunningJob rjob = addTaskToJob(jobId, localJobFile, tip);
       synchronized (rjob) {
         if (!rjob.localized) {
-          localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), (t
-              .getJobId())
-              + Path.SEPARATOR + "job.jar");
+          localJarFile = new Path(fConf.getLocalPath(getJobCacheSubdir()), 
+                                  jobId + Path.SEPARATOR + "job.jar");
   
           String jobFile = t.getJobFile();
           fs.copyToLocalFile(new Path(jobFile), localJobFile);
@@ -385,28 +404,7 @@
     public InterTrackerProtocol getJobClient() {
       return jobClient;
     }
-
-    /**
-     * Are we running under killall-less operating system.
-     */
-    private static boolean isWindows = 
-      System.getProperty("os.name").startsWith("Windows");
-    
-    /**
-     * Get the call stacks for all java processes on this system.
-     * Obviously, this is only useful for debugging.
-     */
-    private static void getCallStacks() {
-      if (LOG.isDebugEnabled() && !isWindows) {
-         try {
-          Process proc = 
-            Runtime.getRuntime().exec("killall -QUIT java");
-          proc.waitFor();
-        } catch (IOException ie) {
-          LOG.warn(StringUtils.stringifyException(ie));
-        } catch (InterruptedException ie) {}
-      }
-    }
+        
     /**Return the DFS filesystem
      * @return
      */
@@ -417,220 +415,227 @@
     /**
      * Main service loop.  Will stay in this loop forever.
      */
-    int offerService() throws Exception {
+    State offerService() throws Exception {
         long lastHeartbeat = 0;
         this.fs = FileSystem.getNamed(jobClient.getFilesystemName(), this.fConf);
 
         while (running && !shuttingDown) {
+          try {
             long now = System.currentTimeMillis();
 
             long waitTime = HEARTBEAT_INTERVAL - (now - lastHeartbeat);
             if (waitTime > 0) {
-                try {
-                    // sleeps for the wait time, wakes up if a task is finished.
-		    synchronized(finishedCount) {
-                        if (finishedCount[0] == 0) {
-			    finishedCount.wait(waitTime);
-                        }
-                        finishedCount[0] = 0;
-                    }
-                } catch (InterruptedException ie) {
-               }
-	    }
-            lastHeartbeat = now;
-
-            //
-            // Emit standard hearbeat message to check in with JobTracker
-            //
-            Vector taskReports = new Vector();
-            synchronized (this) {
-                for (Iterator it = runningTasks.values().iterator(); 
-                     it.hasNext(); ) {
-                    TaskInProgress tip = (TaskInProgress) it.next();
-                    TaskStatus status = tip.createStatus();
-                    taskReports.add(status);
+              // sleeps for the wait time, wakes up if a task is finished.
+              synchronized(finishedCount) {
+                if (finishedCount[0] == 0) {
+                  finishedCount.wait(waitTime);
                 }
-            }
-
-            //
-            // Xmit the heartbeat
-            //
-            
-            TaskTrackerStatus status = 
-              new TaskTrackerStatus(taskTrackerName, localHostname, 
-                                    httpPort, taskReports, 
-                                    failures); 
-            int resultCode = jobClient.emitHeartbeat(status, justStarted);
-            synchronized (this) {
-              for (Iterator it = taskReports.iterator();
-                   it.hasNext(); ) {
-                  TaskStatus taskStatus = (TaskStatus) it.next();
-                  if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
-                      if (taskStatus.getIsMap()) {
-                          mapTotal--;
-                      } else {
-                          reduceTotal--;
-                      }
-                      myMetrics.completeTask();
-                      runningTasks.remove(taskStatus.getTaskId());
-                  }
+                finishedCount[0] = 0;
               }
             }
-            justStarted = false;
-              
-            if (resultCode == InterTrackerProtocol.UNKNOWN_TASKTRACKER) {
-                return STALE_STATE;
-            }
 
-            //
-            // Check if we should createRecord a new Task
-            //
-            try {
-              if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) && acceptNewTasks) {
-                  checkLocalDirs(fConf.getLocalDirs());
-                  
-                  if (enoughFreeSpace(minSpaceStart)) {
-                    Task t = jobClient.pollForNewTask(taskTrackerName);
-                    if (t != null) {
-                      startNewTask(t);
-                    }
-                  }
-              }
-            } catch (DiskErrorException de ) {
-                LOG.warn("Exiting task tracker because "+de.getMessage());
-                jobClient.reportTaskTrackerError(taskTrackerName, 
-                        "DiskErrorException", de.getMessage());
-                return STALE_STATE;
-            } catch (IOException ie) {
-              LOG.info("Problem launching task: " + 
-                       StringUtils.stringifyException(ie));
+            if (!transmitHeartBeat()) {
+              return State.STALE;
             }
+            lastHeartbeat = now;
+            justStarted = false;
 
-            //
-            // Kill any tasks that have not reported progress in the last X seconds.
-            //
-            synchronized (this) {
-                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
-                    TaskInProgress tip = (TaskInProgress) it.next();
-                    long timeSinceLastReport = System.currentTimeMillis() - 
-                                               tip.getLastProgressReport();
-                    if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
-                        (timeSinceLastReport > this.taskTimeout) &&
-                        !tip.wasKilled) {
-                        String msg = "Task failed to report status for " +
-                                     (timeSinceLastReport / 1000) + 
-                                     " seconds. Killing.";
-                        LOG.info(tip.getTask().getTaskId() + ": " + msg);
-                        getCallStacks();
-                        tip.reportDiagnosticInfo(msg);
-                        try {
-                          tip.killAndCleanup(true);
-                        } catch (IOException ie) {
-                          LOG.info("Problem cleaning task up: " +
-                                   StringUtils.stringifyException(ie));
-                        }
-                    }
-                }
+            checkForNewTasks();
+            markUnresponsiveTasks();
+            closeCompletedTasks();
+            killOverflowingTasks();
+            
+            //we've cleaned up, resume normal operation
+            if (!acceptNewTasks && tasks.isEmpty()) {
+                acceptNewTasks=true;
             }
+          } catch (InterruptedException ie) {
+            LOG.info("Interrupted. Closing down.");
+            return State.INTERRUPTED;
+          } catch (DiskErrorException de) {
+            String msg = "Exiting task tracker for disk error:\n" +
+                         StringUtils.stringifyException(de);
+            LOG.error(msg);
+            jobClient.reportTaskTrackerError(taskTrackerName, 
+                    "DiskErrorException", msg);
+            return State.STALE;
+          } catch (Exception except) {
+            String msg = "Caught exception: " + 
+                         StringUtils.stringifyException(except);
+            LOG.error(msg);
+          }
+        }
 
-            //
-            // Check for any Tasks that should be killed, even if
-            // the containing Job is still ongoing.  (This happens
-            // with speculative execution, when one version of the
-            // task finished before another
-            //
+        return State.NORMAL;
+    }
 
-            //
-            // Check for any Tasks whose job may have ended
-            //
-            try {
-            String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
-            if (toCloseIds != null) {
-              synchronized (this) {
-                for (int i = 0; i < toCloseIds.length; i++) {
-                  Object tip = tasks.get(toCloseIds[i]);
-                  synchronized(runningJobs){
-                    runningJobs.remove(((TaskInProgress)
-                	 	  tasks.get(toCloseIds[i])).getTask().getJobId());
-                  }
-                  if (tip != null) {
-                    tasksToCleanup.put(tip);
-                  } else {
-                    LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
-                  }
+    /**
+     * Build and transmit the heart beat to the JobTracker
+     * @return false if the tracker was unknown
+     * @throws IOException
+     */
+    private boolean transmitHeartBeat() throws IOException {
+      //
+      // Build the heartbeat information for the JobTracker
+      //
+      List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+      synchronized (this) {
+          for (TaskInProgress tip: runningTasks.values()) {
+              taskReports.add(tip.createStatus());
+          }
+      }
+      TaskTrackerStatus status = 
+        new TaskTrackerStatus(taskTrackerName, localHostname, 
+                              httpPort, taskReports, 
+                              failures); 
+
+      //
+      // Xmit the heartbeat
+      //
+      
+      int resultCode = jobClient.emitHeartbeat(status, justStarted);
+      synchronized (this) {
+        for (TaskStatus taskStatus: taskReports) {
+            if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+                if (taskStatus.getIsMap()) {
+                    mapTotal--;
+                } else {
+                    reduceTotal--;
                 }
-              }
-            }
-            } catch (IOException ie) {
-              LOG.info("Problem getting closed tasks: " +
-                       StringUtils.stringifyException(ie));
+                myMetrics.completeTask();
+                runningTasks.remove(taskStatus.getTaskId());
             }
-            
-            //Check if we're dangerously low on disk space
-            // If so, kill jobs to free up space and make sure
-            // we don't accept any new tasks
-            // Try killing the reduce jobs first, since I believe they
-            // use up most space
-            // Then pick the one with least progress
-            
-            if (!enoughFreeSpace(minSpaceKill)) {
-              acceptNewTasks=false; 
-              //we give up! do not accept new tasks until
-              //all the ones running have finished and they're all cleared up
-              synchronized (this) {
-                TaskInProgress killMe = null;
-
-                for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
-                  TaskInProgress tip = (TaskInProgress) it.next();
-                  if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
-                      !tip.wasKilled) {
-                        	
-                    if (killMe == null) {
-                      killMe = tip;
-
-                    } else if (!tip.getTask().isMapTask()) {
-                      //reduce task, give priority
-                      if (killMe.getTask().isMapTask() || 
-                          (tip.getTask().getProgress().get() < 
-                           killMe.getTask().getProgress().get())) {
-
-                        killMe = tip;
-                      }
-
-                    } else if (killMe.getTask().isMapTask() &&
-                               tip.getTask().getProgress().get() < 
-                               killMe.getTask().getProgress().get()) {
-                      //map task, only add if the progress is lower
-
-                      killMe = tip;
-                    }
-                  }
-                }
+        }
+      }
+      return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+    }
 
-                if (killMe!=null) {
-                  String msg = "Tasktracker running out of space. Killing task.";
-                  LOG.info(killMe.getTask().getTaskId() + ": " + msg);
-                  killMe.reportDiagnosticInfo(msg);
-                  try {
-                    killMe.killAndCleanup(true);
-                  } catch (IOException ie) {
-                    LOG.info("Problem cleaning task up: " +
-                             StringUtils.stringifyException(ie));
-                  }
-                }
-              }
+    /**
+     * Check to see if there are any new tasks that we should run.
+     * @throws IOException
+     */
+    private void checkForNewTasks() throws IOException {
+      //
+      // Check if we should ask for a new Task
+      //
+      if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+          acceptNewTasks) {
+        checkLocalDirs(fConf.getLocalDirs());
+        
+        if (enoughFreeSpace(minSpaceStart)) {
+          Task t = jobClient.pollForNewTask(taskTrackerName);
+          if (t != null) {
+            startNewTask(t);
+          }
+        }
+      }
+    }
+    
+    /**
+     * Kill any tasks that have not reported progress in the last X seconds.
+     */
+    private synchronized void markUnresponsiveTasks() throws IOException {
+      long now = System.currentTimeMillis();
+        for (TaskInProgress tip: runningTasks.values()) {
+            long timeSinceLastReport = now - tip.getLastProgressReport();
+            if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+                (timeSinceLastReport > this.taskTimeout) &&
+                !tip.wasKilled) {
+                String msg = "Task failed to report status for " +
+                             (timeSinceLastReport / 1000) + 
+                             " seconds. Killing.";
+                LOG.info(tip.getTask().getTaskId() + ": " + msg);
+                ReflectionUtils.logThreadInfo(LOG, "lost task", 30);
+                tip.reportDiagnosticInfo(msg);
+                tasksToCleanup.put(tip);
             }
+        }
+    }
 
-
-            //we've cleaned up, resume normal operation
-            if (!acceptNewTasks && tasks.isEmpty()) {
-                acceptNewTasks=true;
+    /**
+     * Ask the JobTracker if there are any tasks that we should clean up,
+     * either because we don't need them any more or because the job is done.
+     */
+    private void closeCompletedTasks() throws IOException {
+      String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
+      if (toCloseIds != null) {
+        synchronized (this) {
+          for (int i = 0; i < toCloseIds.length; i++) {
+            TaskInProgress tip = tasks.get(toCloseIds[i]);
+            if (tip != null) {
+              // remove the task from running jobs, removing the job if 
+              // it is the last task
+              removeTaskFromJob(tip.getTask().getJobId(), tip);
+              tasksToCleanup.put(tip);
+            } else {
+              LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
             }
+          }
         }
+      }
+    }
 
-        return 0;
+    /** Check if we're dangerously low on disk space
+     * If so, kill jobs to free up space and make sure
+     * we don't accept any new tasks
+     * Try killing the reduce jobs first, since I believe they
+     * use up most space
+     * Then pick the one with least progress
+     */
+    private void killOverflowingTasks() throws IOException {
+      if (!enoughFreeSpace(minSpaceKill)) {
+        acceptNewTasks=false; 
+        //we give up! do not accept new tasks until
+        //all the ones running have finished and they're all cleared up
+        synchronized (this) {
+          TaskInProgress killMe = findTaskToKill();
+
+          if (killMe!=null) {
+            String msg = "Tasktracker running out of space." +
+                         " Killing task.";
+            LOG.info(killMe.getTask().getTaskId() + ": " + msg);
+            killMe.reportDiagnosticInfo(msg);
+            tasksToCleanup.put(killMe);
+          }
+        }
+      }
     }
+    
+    /**
+     * Pick a task to kill to free up space
+     * @return the task to kill or null, if one wasn't found
+     */
+    private TaskInProgress findTaskToKill() {
+      TaskInProgress killMe = null;
+      for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+        TaskInProgress tip = (TaskInProgress) it.next();
+        if ((tip.getRunState() == TaskStatus.State.RUNNING) &&
+            !tip.wasKilled) {
+                
+          if (killMe == null) {
+            killMe = tip;
+
+          } else if (!tip.getTask().isMapTask()) {
+            //reduce task, give priority
+            if (killMe.getTask().isMapTask() || 
+                (tip.getTask().getProgress().get() < 
+                 killMe.getTask().getProgress().get())) {
+
+              killMe = tip;
+            }
+
+          } else if (killMe.getTask().isMapTask() &&
+                     tip.getTask().getProgress().get() < 
+                     killMe.getTask().getProgress().get()) {
+            //map task, only add if the progress is lower
 
+            killMe = tip;
+          }
+        }
+      }
+      return killMe;
+    }
+    
     /**
      * Check if all of the local directories have enough
      * free space
@@ -640,6 +645,9 @@
      * @throws IOException 
      */
     private boolean enoughFreeSpace(long minSpace) throws IOException {
+      if (minSpace == 0) {
+        return true;
+      }
       String[] localDirs = fConf.getLocalDirs();
       for (int i = 0; i < localDirs.length; i++) {
         DF df = null;
@@ -704,7 +712,7 @@
                     // This while-loop attempts reconnects if we get network errors
                     while (running && ! staleState && !shuttingDown ) {
                         try {
-                            if (offerService() == STALE_STATE) {
+                            if (offerService() == State.STALE) {
                                 staleState = true;
                             }
                         } catch (Exception ex) {
@@ -722,11 +730,12 @@
                     close();
                 }
                 if (shuttingDown) { return; }
-                LOG.info("Reinitializing local state");
+                LOG.warn("Reinitializing local state");
                 initialize();
             }
         } catch (IOException iex) {
-            LOG.info("Got fatal exception while reinitializing TaskTracker: " + iex.toString());
+            LOG.error("Got fatal exception while reinitializing TaskTracker: " +
+                      StringUtils.stringifyException(iex));
             return;
         }
     }
@@ -811,7 +820,8 @@
                 progress, runstate, 
                 diagnosticInfo.toString(), 
                 "initializing",  
-                 getName(), task.isMapTask()?Phase.MAP:Phase.SHUFFLE); 
+                 getName(), task.isMapTask()? TaskStatus.Phase.MAP:
+                   TaskStatus.Phase.SHUFFLE); 
             keepJobFiles = false;
         }
         
@@ -884,17 +894,18 @@
         /**
          * The task is reporting its progress
          */
-        public synchronized void reportProgress(float p, String state, Phase newPhase) {
+        public synchronized void reportProgress(float p, String state, 
+                                                TaskStatus.Phase newPhase) {
             LOG.info(task.getTaskId()+" "+p+"% "+state);
             this.progress = p;
             this.runstate = TaskStatus.State.RUNNING;
             this.lastProgressReport = System.currentTimeMillis();
-            Phase oldPhase = taskStatus.getPhase() ;
+            TaskStatus.Phase oldPhase = taskStatus.getPhase() ;
             if( oldPhase != newPhase ){
               // sort phase started
-              if( newPhase == Phase.SORT ){
+              if( newPhase == TaskStatus.Phase.SORT ){
                 this.taskStatus.setShuffleFinishTime(System.currentTimeMillis());
-              }else if( newPhase == Phase.REDUCE){
+              }else if( newPhase == TaskStatus.Phase.REDUCE){
                 this.taskStatus.setSortFinishTime(System.currentTimeMillis());
               }
             }
@@ -1068,6 +1079,16 @@
                     JOBCACHE + Path.SEPARATOR + task.getJobId() + Path.SEPARATOR +
                     taskId);
             }
+        
+        public boolean equals(Object obj) {
+          return (obj instanceof TaskInProgress) &&
+                 task.getTaskId().equals
+                   (((TaskInProgress) obj).getTask().getTaskId());
+        }
+        
+        public int hashCode() {
+          return task.getTaskId().hashCode();
+        }
     }
 
     
@@ -1089,7 +1110,10 @@
     /**
      * Called periodically to report Task progress, from 0.0 to 1.0.
      */
-    public synchronized void progress(String taskid, float progress, String state, Phase phase) throws IOException {
+    public synchronized void progress(String taskid, float progress, 
+                                      String state, 
+                                      TaskStatus.Phase phase
+                                      ) throws IOException {
         TaskInProgress tip = (TaskInProgress) tasks.get(taskid);
         if (tip != null) {
           tip.reportProgress(progress, state, phase);
@@ -1150,7 +1174,7 @@
           tip.taskFinished();
           synchronized(finishedCount) {
               finishedCount[0]++;
-              finishedCount.notifyAll();
+              finishedCount.notify();
           }
         } else {
           LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
@@ -1176,8 +1200,14 @@
     static class RunningJob{
       Path jobFile;
       // keep this for later use
-      ArrayList tasks;
+      Set<TaskInProgress> tasks;
       boolean localized;
+      
+      RunningJob(Path jobFile) {
+        localized = false;
+        tasks = new HashSet();
+        this.jobFile = jobFile;
+      }
     }
 
     /** 
@@ -1239,7 +1269,7 @@
                     LOG.info("Ping exception: " + msg);
                     remainingRetries -=1;
                     if (remainingRetries == 0) {
-                      getCallStacks();
+                      ReflectionUtils.logThreadInfo(LOG, "ping exception", 0);
                       LOG.warn("Last retry, killing "+taskid);
                       System.exit(65);
                     }
@@ -1329,9 +1359,11 @@
             System.out.println("usage: TaskTracker");
             System.exit(-1);
         }
-
         try {
           JobConf conf=new JobConf();
+          // enable the server to track time spent waiting on locks
+          ReflectionUtils.setContentionTracing
+              (conf.getBoolean("tasktracker.contention.tracking", false));
           new TaskTracker(conf).run();
         } catch (IOException e) {
             LOG.warn( "Can not start task tracker because "+

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Tue Oct 24 14:18:34 2006
@@ -42,25 +42,26 @@
     String host;
     int httpPort;
     int failures;
-    Vector taskReports;
+    List<TaskStatus> taskReports;
     
     volatile long lastSeen;
     
     /**
      */
     public TaskTrackerStatus() {
+      taskReports = new ArrayList();
     }
 
     /**
      */
     public TaskTrackerStatus(String trackerName, String host, 
-                             int httpPort, Vector taskReports, int failures) {
+                             int httpPort, List<TaskStatus> taskReports, 
+                             int failures) {
         this.trackerName = trackerName;
         this.host = host;
         this.httpPort = httpPort;
 
-        this.taskReports = new Vector();
-        this.taskReports.addAll(taskReports);
+        this.taskReports = new ArrayList(taskReports);
         this.failures = failures;
     }
 
@@ -167,7 +168,6 @@
         this.host = UTF8.readString(in);
         this.httpPort = in.readInt();
 
-        taskReports = new Vector();
         taskReports.clear();
 
         int numTasks = in.readInt();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Tue Oct 24 14:18:34 2006
@@ -37,7 +37,8 @@
    * @param state description of task's current state
    * @param phase current phase of the task.
    */
-  void progress(String taskid, float progress, String state, Phase phase)
+  void progress(String taskid, float progress, String state, 
+                TaskStatus.Phase phase)
     throws IOException;
 
   /** Report error messages back to parent.  Calls should be sparing, since all

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java?view=diff&rev=467488&r1=467487&r2=467488
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/util/ReflectionUtils.java Tue Oct 24 14:18:34 2006
@@ -17,7 +17,11 @@
 package org.apache.hadoop.util;
 
 import java.lang.reflect.Constructor;
+import java.io.*;
+import java.lang.management.*;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.*;
 import org.apache.hadoop.mapred.*;
 
@@ -54,5 +58,87 @@
             }
         }
         return result;
+    }
+    
+    static private ThreadMXBean threadBean = 
+      ManagementFactory.getThreadMXBean();
+    
+    public static void setContentionTracing(boolean val) {
+      threadBean.setThreadContentionMonitoringEnabled(val);
+    }
+    
+    private static String getTaskName(long id, String name) {
+      if (name == null) {
+        return Long.toString(id);
+      }
+      return id + " (" + name + ")";
+    }
+    
+    /**
+     * Print all of the thread's information and stack traces
+     * @author Owen O'Malley
+     * @param stream the stream to
+     * @param title a string title for the stack trace
+     */
+    public static void printThreadInfo(PrintWriter stream,
+                                        String title) {
+      final int STACK_DEPTH = 20;
+      boolean contention = threadBean.isThreadContentionMonitoringEnabled();
+      long[] threadIds = threadBean.getAllThreadIds();
+      stream.println("Process Thread Dump: " + title);
+      stream.println(threadIds.length + " active threads");
+      for (long tid: threadIds) {
+        ThreadInfo info = threadBean.getThreadInfo(tid, STACK_DEPTH);
+        if (info == null) {
+          stream.println("  Inactive");
+          continue;
+        }
+        stream.println("Thread " + 
+                       getTaskName(info.getThreadId(),
+                                   info.getThreadName()) + ":");
+        Thread.State state = info.getThreadState();
+        stream.println("  State: " + state);
+        stream.println("  Blocked count: " + info.getBlockedCount());
+        stream.println("  Waited count: " + info.getWaitedCount());
+        if (contention) {
+          stream.println("  Blocked time: " + info.getBlockedTime());
+          stream.println("  Waited time: " + info.getWaitedTime());
+        }
+        if (state == Thread.State.WAITING) {
+          stream.println("  Waiting on " + info.getLockName());
+        } else  if (state == Thread.State.BLOCKED) {
+          stream.println("  Blocked on " + info.getLockName());
+          stream.println("  Blocked by " + 
+                         getTaskName(info.getLockOwnerId(),
+                                     info.getLockOwnerName()));
+        }
+        stream.println("  Stack:");
+        for (StackTraceElement frame: info.getStackTrace()) {
+          stream.println("    " + frame.toString());
+        }
+      }
+      stream.flush();
+    }
+    
+    private static long previousLogTime = 0;
+    
+    /**
+     * Log the current thread stacks at INFO level.
+     * @param log the logger that logs the stack trace
+     * @param title a descriptive title for the call stacks
+     * @param minInterval the minimum time from the last 
+     */
+    public static synchronized void logThreadInfo(Log log,
+                                                  String title,
+                                                  long minInterval) {
+      if (log.isInfoEnabled()) {
+        long now = System.currentTimeMillis();
+        if (now - previousLogTime >= minInterval * 1000) {
+          previousLogTime = now;
+          ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+          printThreadInfo(new PrintWriter(buffer), title);
+          log.info(buffer.toString());
+        }
+      }
     }
 }