You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by mc...@apache.org on 2005/08/30 18:58:39 UTC
svn commit: r264812 -
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Author: mc
Date: Tue Aug 30 09:58:37 2005
New Revision: 264812
URL: http://svn.apache.org/viewcvs?rev=264812&view=rev
Log:
Fix task killing and cleanup; there was a code path where
the tasktracker could silently drop a task.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=264812&r1=264811&r2=264812&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Tue Aug 30 09:58:37 2005
@@ -126,14 +126,9 @@
*/
public synchronized void close() throws IOException {
// Kill running tasks
- Vector v = new Vector();
for (Iterator it = tasks.values().iterator(); it.hasNext(); ) {
TaskInProgress tip = (TaskInProgress) it.next();
- v.add(tip);
- }
- for (Iterator it = v.iterator(); it.hasNext(); ) {
- TaskInProgress tip = (TaskInProgress) it.next();
- tip.cleanup();
+ tip.jobHasFinished();
}
// Wait for them to die and report in
@@ -237,7 +232,7 @@
(System.currentTimeMillis() - tip.getLastProgressReport() > TASK_TIMEOUT)) {
LOG.info("Task " + tip.getTask().getTaskId() + " timed out. Killing.");
tip.reportDiagnosticInfo("Timed out.");
- tip.cleanup();
+ tip.killAndCleanup();
}
}
}
@@ -249,7 +244,7 @@
if (toCloseId != null) {
synchronized (this) {
TaskInProgress tip = (TaskInProgress) tasks.get(toCloseId);
- tip.cleanup();
+ tip.jobHasFinished();
}
}
lastHeartbeat = now;
@@ -308,7 +303,7 @@
StringBuffer diagnosticInfo = new StringBuffer();
TaskRunner runner;
boolean done = false;
- boolean closeRunnerUponEnd = false;
+ boolean wasKilled = false;
/**
*/
@@ -444,40 +439,53 @@
}
//
- // We've already tried to 'cleanup' this task. So once
- // the process actually finishes, finish the cleanup work.
+ // If the task has failed, or if the task was killAndCleanup()'ed,
+ // we should clean up right away. We only wait to cleanup
+ // if the task succeeded, and its results might be useful
+ // later on to downstream job processing.
//
- if (closeRunnerUponEnd) {
- runningTasks.remove(task.getTaskId());
- tasks.remove(task.getTaskId());
+ if (wasKilled || runstate == TaskStatus.FAILED) {
try {
- runner.close();
+ cleanup();
} catch (IOException ie) {
}
- try {
- FileUtil.fullyDelete(localTaskDir);
- } catch (IOException ie) {
- }
}
}
/**
- * The owning job is done, and this task is no longer needed.
- * This method cleans up the task, first killing it if necessary.
+ * We no longer need anything from this task, as the job has
+ * finished. If the task is still running, kill it (and clean up
+ */
+ public synchronized void jobHasFinished() throws IOException {
+ if (getRunState() == TaskStatus.RUNNING) {
+ killAndCleanup();
+ } else {
+ cleanup();
+ }
+ }
+
+ /**
+ * This task has run on too long, and should be killed.
*/
- public synchronized void cleanup() throws IOException {
+ public synchronized void killAndCleanup() throws IOException {
if (runstate == TaskStatus.RUNNING) {
- closeRunnerUponEnd = true;
+ wasKilled = true;
runner.kill();
- } else {
- runningTasks.remove(task.getTaskId());
- tasks.remove(task.getTaskId());
- try {
- runner.close();
- } catch (IOException ie) {
- }
- FileUtil.fullyDelete(localTaskDir);
}
+ }
+
+ /**
+ * We no longer need anything from this task. Either the
+ * controlling job is all done and the files have been copied
+ * away, or the task failed and we don't need the remains.
+ */
+ synchronized void cleanup() throws IOException {
+ tasks.remove(task.getTaskId());
+ try {
+ runner.close();
+ } catch (IOException ie) {
+ }
+ FileUtil.fullyDelete(localTaskDir);
}
}