You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nutch.apache.org by cu...@apache.org on 2005/07/20 20:23:33 UTC

svn commit: r219958 - in /lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred: LocalJobRunner.java ReduceTaskRunner.java TaskTracker.java TaskUmbilicalProtocol.java

Author: cutting
Date: Wed Jul 20 11:23:25 2005
New Revision: 219958

URL: http://svn.apache.org/viewcvs?rev=219958&view=rev
Log:
Make task child exit promptly when parent is gone.

Modified:
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
    lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/LocalJobRunner.java Wed Jul 20 11:23:25 2005
@@ -115,6 +115,8 @@
       // Ignore for now
     }
 
+    public void ping(String taskid) throws IOException {}
+
     public void done(String taskId) throws IOException {
       int taskIndex = mapIds.indexOf(taskId);
       if (taskIndex >= 0) {                       // mapping

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/ReduceTaskRunner.java Wed Jul 20 11:23:25 2005
@@ -56,6 +56,14 @@
       MapOutputLocation[] locs =
         jobClient.locateMapOutputs(task.getTaskId(), neededStrings);
 
+      if (locs.length == 0) {
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+        }
+        continue;
+      }
+
       LOG.info("Got "+locs.length+" map output locations.");
 
       // try each of these locations

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskTracker.java Wed Jul 20 11:23:25 2005
@@ -489,6 +489,13 @@
         tip.reportDone();
     }
 
+    /** Child checking to see if we're alive.  Normally does nothing.*/
+    public void ping(String taskid) throws IOException {
+      if (tasks.get(taskid) == null) {
+        throw new IOException("No such task id."); // force child exit
+      }
+    }
+
     /////////////////////////////////////////////////////
     //  Called by TaskTracker thread after task process ends
     /////////////////////////////////////////////////////
@@ -516,8 +523,11 @@
             
           Task task = umbilical.getTask(taskid);
           JobConf job = new JobConf(task.getJobFile());
+
+          startPinging(umbilical, taskid);        // start pinging parent
+
           try {
-              task.run(job, umbilical);                   // run the task
+              task.run(job, umbilical);           // run the task
           } catch (Throwable throwable) {
               LOG.log(Level.WARNING, "Failed to spawn child", throwable);
               // Report back any failures, for diagnostic purposes
@@ -526,6 +536,29 @@
               umbilical.reportDiagnosticInfo(taskid, baos.toString());
           }
           umbilical.done(taskid);
+        }
+
+        /** Periodically ping parent and exit when this fails.*/
+        private static void startPinging(final TaskUmbilicalProtocol umbilical,
+                                         final String taskid) {
+          Thread thread = new Thread(new Runnable() {
+              public void run() {
+                while (true) {
+                  try {
+                    umbilical.ping(taskid);
+                  } catch (Throwable t) {
+                    LOG.warning("Parent died.  Exiting "+taskid);
+                    System.exit(1);
+                  }
+                  try {
+                    Thread.sleep(1000);
+                  } catch (InterruptedException e) {
+                  }
+                }
+              }
+            }, "Pinger for "+taskid);
+          thread.setDaemon(true);
+          thread.start();
         }
     }
 

Modified: lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewcvs/lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java?rev=219958&r1=219957&r2=219958&view=diff
==============================================================================
--- lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java (original)
+++ lucene/nutch/branches/mapred/src/java/org/apache/nutch/mapred/TaskUmbilicalProtocol.java Wed Jul 20 11:23:25 2005
@@ -39,6 +39,9 @@
    */
   void reportDiagnosticInfo(String taskid, String trace) throws IOException;
 
+  /** Periodically called by child to check if parent is still alive. */
+  void ping(String taskid) throws IOException;
+
   /** Report that the task is successfully completed.  Failure is assumed if
    * the task process exits without calling this. */
   void done(String taskid) throws IOException;