You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/02 20:43:16 UTC

svn commit: r227062 - /lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Author: mc
Date: Tue Aug  2 11:43:14 2005
New Revision: 227062

URL: http://svn.apache.org/viewcvs?rev=227062&view=rev
Log:

  The TaskTracker will now kill child tasks that fail to
report progress for a 60-second period.  This will take care
of hung tasks.


Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=227062&r1=227061&r2=227062&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Aug  2 11:43:14 2005
@@ -35,7 +35,10 @@
 public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
     private static final int MAX_CURRENT_TASKS = 
     NutchConf.get().getInt("mapred.tasktracker.tasks.maximum", 2);
+
     static final long WAIT_FOR_DONE = 3 * 1000;
+    static final long TASK_MIN_PROGRESS_INTERVAL = 60 * 1000;
+
     static final int STALE_STATE = 1;
 
     public static final Logger LOG =
@@ -221,6 +224,18 @@
             }
 
             //
+            // Kill any tasks that have not reported progress in the last X seconds.
+            //
+            for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+                TaskInProgress tip = (TaskInProgress) it.next();
+                if ((tip.getRunState() == TaskStatus.RUNNING) &&
+                    (System.currentTimeMillis() - tip.getLastProgressReport() > TASK_MIN_PROGRESS_INTERVAL)) {
+
+                    tip.cleanup();
+                }
+            }
+
+            //
             // Check for any Tasks whose job may have ended
             //
             String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
@@ -280,6 +295,7 @@
         File localTaskDir;
         float progress;
         int runstate;
+        long lastProgressReport;
         StringBuffer diagnosticInfo = new StringBuffer();
         TaskRunner runner;
         boolean done = false;
@@ -289,6 +305,7 @@
          */
         public TaskInProgress(Task task) throws IOException {
             this.task = task;
+            this.lastProgressReport = System.currentTimeMillis();
             this.localTaskDir = new File(localDir, task.getTaskId());
             if (localTaskDir.exists()) {
                 FileUtil.fullyDelete(localTaskDir);
@@ -359,6 +376,19 @@
             LOG.info(task.getTaskId()+" "+p+"% "+state);
             this.progress = p;
             this.runstate = TaskStatus.RUNNING;
+            this.lastProgressReport = System.currentTimeMillis();
+        }
+
+        /**
+         */
+        public long getLastProgressReport() {
+            return lastProgressReport;
+        }
+
+        /**
+         */
+        public int getRunState() {
+            return runstate;
         }
 
         /**