You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/02 20:43:16 UTC
svn commit: r227062 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Author: mc
Date: Tue Aug 2 11:43:14 2005
New Revision: 227062
URL: http://svn.apache.org/viewcvs?rev=227062&view=rev
Log:
The TaskTracker will now kill child tasks that fail to
report progress for a 60-second period. This will take care
of hung tasks.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=227062&r1=227061&r2=227062&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Aug 2 11:43:14 2005
@@ -35,7 +35,10 @@
public class TaskTracker implements MRConstants, TaskUmbilicalProtocol, MapOutputProtocol, Runnable {
private static final int MAX_CURRENT_TASKS =
NutchConf.get().getInt("mapred.tasktracker.tasks.maximum", 2);
+
static final long WAIT_FOR_DONE = 3 * 1000;
+ static final long TASK_MIN_PROGRESS_INTERVAL = 60 * 1000;
+
static final int STALE_STATE = 1;
public static final Logger LOG =
@@ -221,6 +224,18 @@
}
//
+ // Kill any tasks that have not reported progress in the last X seconds.
+ //
+ for (Iterator it = runningTasks.values().iterator(); it.hasNext(); ) {
+ TaskInProgress tip = (TaskInProgress) it.next();
+ if ((tip.getRunState() == TaskStatus.RUNNING) &&
+ (System.currentTimeMillis() - tip.getLastProgressReport() > TASK_MIN_PROGRESS_INTERVAL)) {
+
+ tip.cleanup();
+ }
+ }
+
+ //
// Check for any Tasks whose job may have ended
//
String toCloseId = jobClient.pollForClosedTask(taskTrackerName);
@@ -280,6 +295,7 @@
File localTaskDir;
float progress;
int runstate;
+ long lastProgressReport;
StringBuffer diagnosticInfo = new StringBuffer();
TaskRunner runner;
boolean done = false;
@@ -289,6 +305,7 @@
*/
public TaskInProgress(Task task) throws IOException {
this.task = task;
+ this.lastProgressReport = System.currentTimeMillis();
this.localTaskDir = new File(localDir, task.getTaskId());
if (localTaskDir.exists()) {
FileUtil.fullyDelete(localTaskDir);
@@ -359,6 +376,19 @@
LOG.info(task.getTaskId()+" "+p+"% "+state);
this.progress = p;
this.runstate = TaskStatus.RUNNING;
+ this.lastProgressReport = System.currentTimeMillis();
+ }
+
+ /**
+ */
+ public long getLastProgressReport() {
+ return lastProgressReport;
+ }
+
+ /**
+ */
+ public int getRunState() {
+ return runstate;
}
/**