You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/05/03 00:05:09 UTC

svn commit: r399065 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobTracker.java

Author: cutting
Date: Tue May  2 15:05:08 2006
New Revision: 399065

URL: http://svn.apache.org/viewcvs?rev=399065&view=rev
Log:
HADOOP-185.  Fix so that, if a task tracker times out making the RPC asking for a new task to run, the job tracker does not think that it is actually running the task returned (but never received).  Contributed by Owen.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/CHANGES.txt?rev=399065&r1=399064&r2=399065&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue May  2 15:05:08 2006
@@ -151,6 +151,10 @@
     files containing random data.  The second sorts the output of the
     first.  (omalley via cutting)
 
+40. HADOOP-185.  Fix so that, when a task tracker times out making the
+    RPC asking for a new task to run, the job tracker does not think
+    that it is actually running the task returned.  (omalley via cutting)
+
 Release 0.1.1 - 2006-04-08
 
  1. Added CHANGES.txt, logging all significant changes to Hadoop.  (cutting)

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewcvs/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=399065&r1=399064&r2=399065&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue May  2 15:05:08 2006
@@ -76,6 +76,93 @@
         return tracker;
     }
 
+    /**
+     * A thread to timeout tasks that have been assigned to task trackers,
+     * but that haven't reported back yet.
+     * Note that I included a stop() method, even though there is no place
+     * where JobTrackers are cleaned up.
+     * @author Owen O'Malley
+     */
+    private class ExpireLaunchingTasks implements Runnable {
+      private volatile boolean shouldRun = true;
+      /**
+       * This is a map of the tasks that have been assigned to task trackers,
+       * but that have not yet been seen in a status report.
+       * map: task-id (String) -> time-assigned (Long)
+       */
+      private Map launchingTasks = new LinkedHashMap();
+      private static final String errorMsg = "Error launching task";
+      private static final String errorHost = "n/a";
+      
+      public void run() {
+        try {
+          while (shouldRun) {
+            // Every 3 minutes check for any tasks that are overdue
+            Thread.sleep(TASKTRACKER_EXPIRY_INTERVAL/3);
+            long now = System.currentTimeMillis();
+            LOG.fine("Starting launching task sweep");
+            synchronized (launchingTasks) {
+              Iterator itr = launchingTasks.entrySet().iterator();
+              while (itr.hasNext()) {
+                Map.Entry pair = (Map.Entry) itr.next();
+                String taskId = (String) pair.getKey();
+                long age = now - ((Long) pair.getValue()).longValue();
+                LOG.fine(taskId + " is " + age + " ms old.");
+                if (age > TASKTRACKER_EXPIRY_INTERVAL) {
+                  LOG.info("Launching task " + taskId + " timed out.");
+                  TaskInProgress tip = null;
+                  synchronized (JobTracker.this) {
+                    tip = (TaskInProgress) taskidToTIPMap.get(taskId);
+                  }
+                  if (tip != null) {
+                    synchronized (tip) {
+                      JobInProgress job = tip.getJob();
+                      // record why the job failed, so that the user can
+                      // see the problem
+                      TaskStatus status = 
+                        new TaskStatus(taskId,
+                                       tip.isMapTask(),
+                                       0.0f,
+                                       TaskStatus.FAILED,
+                                       errorMsg,
+                                       errorMsg,
+                                       errorHost);
+                      tip.updateStatus(status);
+                      job.failedTask(tip, taskId, errorHost);
+                    }
+                  }
+                  itr.remove();
+                } else {
+                  // the tasks are sorted by start time, so once we find
+                  // one that we want to keep, we are done for this cycle.
+                  break;
+                }
+              }
+            }
+          }
+        } catch (InterruptedException ie) {
+          // all done
+        }
+      }
+      
+      public void addNewTask(String taskName) {
+        synchronized (launchingTasks) {
+          launchingTasks.put(taskName, 
+                             Long.valueOf(System.currentTimeMillis()));
+        }
+      }
+      
+      public void removeTask(String taskName) {
+        synchronized (launchingTasks) {
+          launchingTasks.remove(taskName);
+        }
+      }
+      
+      public void stop() {
+        shouldRun = false;
+      }
+    }
+    
     ///////////////////////////////////////////////////////
     // Used to expire TaskTrackers that have gone down
     ///////////////////////////////////////////////////////
@@ -277,7 +364,9 @@
     ExpireTrackers expireTrackers = new ExpireTrackers();
     RetireJobs retireJobs = new RetireJobs();
     JobInitThread initJobs = new JobInitThread();
-
+    ExpireLaunchingTasks expireLaunchingTasks = new ExpireLaunchingTasks();
+    Thread expireLaunchingTaskThread = new Thread(expireLaunchingTasks);
+    
     /**
      * It might seem like a bug to maintain a TreeSet of status objects,
      * which can be updated at any time.  But that's not what happens!  We
@@ -346,12 +435,12 @@
         this.port = addr.getPort();
         this.interTrackerServer = RPC.getServer(this, addr.getPort(), 10, false, conf);
         this.interTrackerServer.start();
-	Properties p = System.getProperties();
-	for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
-	    String key = (String) it.next();
-	    String val = (String) p.getProperty(key);
-	    LOG.info("Property '" + key + "' is " + val);
-	}
+        Properties p = System.getProperties();
+        for (Iterator it = p.keySet().iterator(); it.hasNext(); ) {
+          String key = (String) it.next();
+          String val = (String) p.getProperty(key);
+          LOG.info("Property '" + key + "' is " + val);
+        }
 
         this.infoPort = conf.getInt("mapred.job.tracker.info.port", 50030);
         this.infoServer = new JobTrackerInfoServer(this, infoPort);
@@ -362,6 +451,7 @@
         new Thread(this.expireTrackers).start();
         new Thread(this.retireJobs).start();
         new Thread(this.initJobs).start();
+        expireLaunchingTaskThread.start();
     }
 
     public static InetSocketAddress getAddress(Configuration conf) {
@@ -622,7 +712,8 @@
 
                     Task t = job.obtainNewMapTask(taskTracker, tts);
                     if (t != null) {
-                        return t;
+                      expireLaunchingTasks.addNewTask(t.getTaskId());
+                      return t;
                     }
 
                     //
@@ -656,7 +747,8 @@
 
                     Task t = job.obtainNewReduceTask(taskTracker, tts);
                     if (t != null) {
-                        return t;
+                      expireLaunchingTasks.addNewTask(t.getTaskId());
+                      return t;
                     }
 
                     //
@@ -878,10 +970,12 @@
         for (Iterator it = status.taskReports(); it.hasNext(); ) {
             TaskStatus report = (TaskStatus) it.next();
             report.setHostname(status.getHost());
-            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(report.getTaskId());
+            String taskId = report.getTaskId();
+            TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
             if (tip == null) {
                 LOG.info("Serious problem.  While updating status, cannot find taskid " + report.getTaskId());
             } else {
+                expireLaunchingTasks.removeTask(taskId);
                 JobInProgress job = tip.getJob();
                 job.updateTaskStatus(tip, report);