You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/20 20:23:33 UTC
svn commit: r219958 - in
/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred:
LocalJobRunner.java ReduceTaskRunner.java TaskTracker.java
TaskUmbilicalProtocol.java
Author: cutting
Date: Wed Jul 20 11:23:25 2005
New Revision: 219958
URL: http://svn.apache.org/viewcvs?rev=219958&view=rev
Log:
Make task child exit promptly when parent is gone.
Modified:
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java Wed Jul 20 11:23:25 2005
@@ -115,6 +115,8 @@
// Ignore for now
}
+ public void ping(String taskid) throws IOException {}
+
public void done(String taskId) throws IOException {
int taskIndex = mapIds.indexOf(taskId);
if (taskIndex >= 0) { // mapping
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Wed Jul 20 11:23:25 2005
@@ -56,6 +56,14 @@
MapOutputLocation[] locs =
jobClient.locateMapOutputs(task.getTaskId(), neededStrings);
+ if (locs.length == 0) {
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ continue;
+ }
+
LOG.info("Got "+locs.length+" map output locations.");
// try each of these locations
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Wed Jul 20 11:23:25 2005
@@ -489,6 +489,13 @@
tip.reportDone();
}
+ /** Child checking to see if we're alive. Normally does nothing.*/
+ public void ping(String taskid) throws IOException {
+ if (tasks.get(taskid) == null) {
+ throw new IOException("No such task id."); // force child exit
+ }
+ }
+
/////////////////////////////////////////////////////
// Called by TaskTracker thread after task process ends
/////////////////////////////////////////////////////
@@ -516,8 +523,11 @@
Task task = umbilical.getTask(taskid);
JobConf job = new JobConf(task.getJobFile());
+
+ startPinging(umbilical, taskid); // start pinging parent
+
try {
- task.run(job, umbilical); // run the task
+ task.run(job, umbilical); // run the task
} catch (Throwable throwable) {
LOG.log(Level.WARNING, "Failed to spawn child", throwable);
// Report back any failures, for diagnostic purposes
@@ -526,6 +536,29 @@
umbilical.reportDiagnosticInfo(taskid, baos.toString());
}
umbilical.done(taskid);
+ }
+
+ /** Periodically ping parent and exit when this fails.*/
+ private static void startPinging(final TaskUmbilicalProtocol umbilical,
+ final String taskid) {
+ Thread thread = new Thread(new Runnable() {
+ public void run() {
+ while (true) {
+ try {
+ umbilical.ping(taskid);
+ } catch (Throwable t) {
+ LOG.warning("Parent died. Exiting "+taskid);
+ System.exit(1);
+ }
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e) {
+ }
+ }
+ }
+ }, "Pinger for "+taskid);
+ thread.setDaemon(true);
+ thread.start();
}
}
Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Wed Jul 20 11:23:25 2005
@@ -39,6 +39,9 @@
*/
void reportDiagnosticInfo(String taskid, String trace) throws IOException;
+ /** Periodically called by child to check if parent is still alive. */
+ void ping(String taskid) throws IOException;
+
/** Report that the task is successfully completed. Failure is assumed if
* the task process exits without calling this. */
void done(String taskid) throws IOException;