You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:17 UTC

svn commit: r1076950 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/util/ docs/src/documentation/content/xdocs/ mapred/ mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/ test/org/apache/hadoop/util/ ...

Author: omalley
Date: Fri Mar  4 03:25:16 2011
New Revision: 1076950

URL: http://svn.apache.org/viewvc?rev=1076950&view=rev
Log:
commit 75b06ac97b21b4b70c720374f84fdd517cf3e1e6
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:38 2009 -0700

    Applying patch 2855374.mr211.patch

Added:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java
Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java
    hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java Fri Mar  4 03:25:16 2011
@@ -22,6 +22,9 @@ import java.io.File;
 import java.io.IOException;
 import java.io.InputStreamReader;
 import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
@@ -55,6 +58,11 @@ abstract public class Shell {
     return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
   }
 
+  /**Time after which the executing script would be timedout*/
+  protected long timeOutInterval = 0L;
+  /** If or not script timed out*/
+  private AtomicBoolean timedOut;
+
   /** 
    * Get the Unix command for setting the maximum virtual memory available
    * to a given child process. This is only relevant when we are forking a
@@ -98,6 +106,9 @@ abstract public class Shell {
   private File dir;
   private Process process; // sub process used to execute the command
   private int exitCode;
+
+  /**If or not script finished executing*/
+  private volatile AtomicBoolean completed;
   
   public Shell() {
     this(0L);
@@ -137,7 +148,10 @@ abstract public class Shell {
   /** Run a command */
   private void runCommand() throws IOException { 
     ProcessBuilder builder = new ProcessBuilder(getExecString());
-    boolean completed = false;
+    Timer timeOutTimer = null;
+    ShellTimeoutTimerTask timeoutTimerTask = null;
+    timedOut = new AtomicBoolean(false);
+    completed = new AtomicBoolean(false);
     
     if (environment != null) {
       builder.environment().putAll(this.environment);
@@ -147,6 +161,13 @@ abstract public class Shell {
     }
     
     process = builder.start();
+    if (timeOutInterval > 0) {
+      timeOutTimer = new Timer();
+      timeoutTimerTask = new ShellTimeoutTimerTask(
+          this);
+      //One time scheduling.
+      timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+    }
     final BufferedReader errReader = 
             new BufferedReader(new InputStreamReader(process
                                                      .getErrorStream()));
@@ -183,27 +204,32 @@ abstract public class Shell {
         line = inReader.readLine();
       }
       // wait for the process to finish and check the exit code
-      exitCode = process.waitFor();
+      exitCode  = process.waitFor();
       try {
         // make sure that the error thread exits
         errThread.join();
       } catch (InterruptedException ie) {
         LOG.warn("Interrupted while reading the error stream", ie);
       }
-      completed = true;
+      completed.set(true);
+      //the timeout thread handling
+      //taken care in finally block
       if (exitCode != 0) {
         throw new ExitCodeException(exitCode, errMsg.toString());
       }
     } catch (InterruptedException ie) {
       throw new IOException(ie.toString());
     } finally {
+      if ((timeOutTimer!=null) && !timedOut.get()) {
+        timeOutTimer.cancel();
+      }
       // close the input stream
       try {
         inReader.close();
       } catch (IOException ioe) {
         LOG.warn("Error while closing the input stream", ioe);
       }
-      if (!completed) {
+      if (!completed.get()) {
         errThread.interrupt();
       }
       try {
@@ -266,21 +292,47 @@ abstract public class Shell {
     private String[] command;
     private StringBuffer output;
     
+    
     public ShellCommandExecutor(String[] execString) {
-      command = execString.clone();
+      this(execString, null);
     }
-
+    
     public ShellCommandExecutor(String[] execString, File dir) {
-      this(execString);
-      this.setWorkingDirectory(dir);
+      this(execString, dir, null);
     }
-
+   
     public ShellCommandExecutor(String[] execString, File dir, 
                                  Map<String, String> env) {
-      this(execString, dir);
-      this.setEnvironment(env);
+      this(execString, dir, env , 0L);
     }
-    
+
+    /**
+     * Create a new instance of the ShellCommandExecutor to execute a command.
+     * 
+     * @param execString The command to execute with arguments
+     * @param dir If not-null, specifies the directory which should be set
+     *            as the current working directory for the command.
+     *            If null, the current working directory is not modified.
+     * @param env If not-null, environment of the command will include the
+     *            key-value pairs specified in the map. If null, the current
+     *            environment is not modified.
+     * @param timeout Specifies the time in milliseconds, after which the
+     *                command will be killed and the status marked as timedout.
+     *                If 0, the command will not be timed out. 
+     */
+    public ShellCommandExecutor(String[] execString, File dir, 
+        Map<String, String> env, long timeout) {
+      command = execString.clone();
+      if (dir != null) {
+        setWorkingDirectory(dir);
+      }
+      if (env != null) {
+        setEnvironment(env);
+      }
+      timeOutInterval = timeout;
+    }
+        
+
     /** Execute the shell command. */
     public void execute() throws IOException {
       this.run();    
@@ -326,6 +378,24 @@ abstract public class Shell {
     }
   }
   
+  /**
+   * To check if the passed script to shell command executor timed out or
+   * not.
+   * 
+   * @return if the script timed out.
+   */
+  public boolean isTimedOut() {
+    return timedOut.get();
+  }
+  
+  /**
+   * Set if the command has timed out.
+   * 
+   */
+  private void setTimedOut() {
+    this.timedOut.set(true);
+  }
+  
   /** 
    * Static method to execute a shell command. 
    * Covers most of the simple cases without requiring the user to implement  
@@ -334,7 +404,7 @@ abstract public class Shell {
    * @return the output of the executed command.
    */
   public static String execCommand(String ... cmd) throws IOException {
-    return execCommand(null, cmd);
+    return execCommand(null, cmd, 0L);
   }
   
   /** 
@@ -343,15 +413,56 @@ abstract public class Shell {
    * the <code>Shell</code> interface.
    * @param env the map of environment key=value
    * @param cmd shell command to execute.
+   * @param timeout time in milliseconds after which script should be marked timeout
+   * @return the output of the executed command.o
+   */
+  
+  public static String execCommand(Map<String, String> env, String[] cmd,
+      long timeout) throws IOException {
+    ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env, 
+                                                          timeout);
+    exec.execute();
+    return exec.getOutput();
+  }
+
+  /** 
+   * Static method to execute a shell command. 
+   * Covers most of the simple cases without requiring the user to implement  
+   * the <code>Shell</code> interface.
+   * @param env the map of environment key=value
+   * @param cmd shell command to execute.
    * @return the output of the executed command.
    */
   public static String execCommand(Map<String,String> env, String ... cmd) 
   throws IOException {
-    ShellCommandExecutor exec = new ShellCommandExecutor(cmd);
-    if (env != null) {
-      exec.setEnvironment(env);
+    return execCommand(env, cmd, 0L);
+  }
+  
+  /**
+   * Timer which is used to timeout scripts spawned off by shell.
+   */
+  private static class ShellTimeoutTimerTask extends TimerTask {
+
+    private Shell shell;
+
+    public ShellTimeoutTimerTask(Shell shell) {
+      this.shell = shell;
+    }
+
+    @Override
+    public void run() {
+      Process p = shell.getProcess();
+      try {
+        p.exitValue();
+      } catch (Exception e) {
+        //Process has not terminated.
+        //So check if it has completed 
+        //if not just destroy it.
+        if (p != null && !shell.completed.get()) {
+          shell.setTimedOut();
+          p.destroy();
+        }
+      }
     }
-    exec.execute();
-    return exec.getOutput();
   }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar  4 03:25:16 2011
@@ -589,6 +589,63 @@
             </section>
             
           </section>
+          <section>
+            <title>Monitoring Health of TaskTracker Nodes</title>
+            <p>Hadoop Map/Reduce provides a mechanism by which administrators 
+            can configure the TaskTracker to run an administrator supplied
+            script periodically to determine if a node is healthy or not.
+            Administrators can determine if the node is in a healthy state
+            by performing any checks of their choice in the script. If the
+            script detects the node to be in an unhealthy state, it must print
+            a line to standard output beginning with the string <em>ERROR</em>.
+            The TaskTracker spawns the script periodically and checks its 
+            output. If the script's output contains the string <em>ERROR</em>,
+            as described above, the node's status is reported as 'unhealthy'
+            and the node is black-listed on the JobTracker. No further tasks 
+            will be assigned to this node. However, the
+            TaskTracker continues to run the script, so that if the node
+            becomes healthy again, it will be removed from the blacklisted
+            nodes on the JobTracker automatically. The node's health
+            along with the output of the script, if it is unhealthy, is
+            available to the administrator in the JobTracker's web interface.
+            The time since the node was healthy is also displayed on the 
+            web interface.
+            </p>
+            
+            <section>
+            <title>Configuring the Node Health Check Script</title>
+            <p>The following parameters can be used to control the node health 
+            monitoring script in <em>mapred-site.xml</em>.</p>
+            <table>
+            <tr><th>Name</th><th>Description</th></tr>
+            <tr><td><code>mapred.healthChecker.script.path</code></td>
+            <td>Absolute path to the script which is periodically run by the 
+            TaskTracker to determine if the node is 
+            healthy or not. The file should be executable by the TaskTracker.
+            If the value of this key is empty or the file does 
+            not exist or is not executable, node health monitoring
+            is not started.</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.interval</code></td>
+            <td>Frequency at which the node health script is run, 
+            in milliseconds</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.script.timeout</code></td>
+            <td>Time after which the node health script will be killed by
+            the TaskTracker if unresponsive.
+            The node is marked unhealthy. if node health script times out.</td>
+            </tr>
+            <tr>
+            <td><code>mapred.healthChecker.script.args</code></td>
+            <td>Extra arguments that can be passed to the node health script 
+            when launched.
+            These should be comma separated list of arguments. </td>
+            </tr>
+            </table>
+            </section>
+          </section>
           
         </section>
         <section>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar  4 03:25:16 2011
@@ -864,4 +864,40 @@
   </description>
 </property>
 
+<!--  Node health script variables -->
+
+<property>
+  <name>mapred.healthChecker.script.path</name>
+  <value></value>
+  <description>Absolute path to the script which is
+  periodicallyrun by the node health monitoring service to determine if
+  the node is healthy or not. If the value of this key is empty or the
+  file does not exist in the location configured here, the node health
+  monitoring service is not started.</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.interval</name>
+  <value>60000</value>
+  <description>Frequency of the node health script to be run,
+  in milliseconds</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.script.timeout</name>
+  <value>600000</value>
+  <description>Time after node health script should be killed if 
+  unresponsive and considered that the script has failed.</description>
+</property>
+
+<property>
+  <name>mapred.healthChecker.script.args</name>
+  <value></value>
+  <description>List of arguments which are to be passed to 
+  node health script when it is being launched comma seperated.
+  </description>
+</property>
+
+<!--  end of node health script variables -->
+
 </configuration>

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar  4 03:25:16 2011
@@ -62,8 +62,9 @@ interface InterTrackerProtocol extends V
    * Version 24: Changed format of Task and TaskStatus for HADOOP-4759 
    * Version 25: JobIDs are passed in response to JobTracker restart 
    * Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+   * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
    */
-  public static final long versionID = 26L;
+  public static final long versionID = 27L;
   
   public final static int TRACKERS_OK = 0;
   public final static int UNKNOWN_TASKTRACKER = 1;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar  4 03:25:16 2011
@@ -72,6 +72,7 @@ import org.apache.hadoop.mapred.JobHisto
 import org.apache.hadoop.mapred.JobHistory.Values;
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.net.DNSToSwitchMapping;
 import org.apache.hadoop.net.NetUtils;
 import org.apache.hadoop.net.NetworkTopology;
@@ -484,18 +485,28 @@ public class JobTracker implements MRCon
     }
   }
   
+  enum ReasonForBlackListing {
+    EXCEEDING_FAILURES,
+    NODE_UNHEALTHY
+  }
+  
   // The FaultInfo which indicates the number of faults of a tracker
   // and when the last fault occurred
   // and whether the tracker is blacklisted across all jobs or not
   private static class FaultInfo {
+    static final String FAULT_FORMAT_STRING =  "%d failures on the tracker";
     int numFaults = 0;
     long lastUpdated;
     boolean blacklisted; 
-
+    
+    private boolean isHealthy;
+    private HashMap<ReasonForBlackListing, String>rfbMap;
+    
     FaultInfo() {
       numFaults = 0;
       lastUpdated = System.currentTimeMillis();
       blacklisted = false;
+      rfbMap = new  HashMap<ReasonForBlackListing, String>();
     }
 
     void setFaultCount(int num) {
@@ -518,9 +529,47 @@ public class JobTracker implements MRCon
       return blacklisted;
     }
     
-    void setBlacklist(boolean blacklist) {
-      blacklisted = blacklist;
+    void setBlacklist(ReasonForBlackListing rfb, 
+        String trackerFaultReport) {
+      blacklisted = true;
+      this.rfbMap.put(rfb, trackerFaultReport);
+    }
+
+    public void setHealthy(boolean isHealthy) {
+      this.isHealthy = isHealthy;
+    }
+
+    public boolean isHealthy() {
+      return isHealthy;
+    }
+    
+    public String getTrackerFaultReport() {
+      StringBuffer sb = new StringBuffer();
+      for(String reasons : rfbMap.values()) {
+        sb.append(reasons);
+        sb.append("\n");
+      }
+      return sb.toString();
+    }
+    
+    Set<ReasonForBlackListing> getReasonforblacklisting() {
+      return this.rfbMap.keySet();
+    }
+    
+    public void unBlacklist() {
+      this.blacklisted = false;
+      this.rfbMap.clear();
+    }
+
+    public boolean removeBlackListedReason(ReasonForBlackListing rfb) {
+      String str = rfbMap.remove(rfb);
+      return str!=null;
+    }
+
+    public void addBlackListedReason(ReasonForBlackListing rfb, String reason) {
+      this.rfbMap.put(rfb, reason);
     }
+    
   }
 
   private class FaultyTrackersInfo {
@@ -543,26 +592,82 @@ public class JobTracker implements MRCon
      */
     void incrementFaults(String hostName) {
       synchronized (potentiallyFaultyTrackers) {
-        FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
-        if (fi == null) {
-          fi = new FaultInfo();
-          potentiallyFaultyTrackers.put(hostName, fi);
-        }
+        FaultInfo fi = getFaultInfo(hostName, true);
         int numFaults = fi.getFaultCount();
         ++numFaults;
         fi.setFaultCount(numFaults);
         fi.setLastUpdated(System.currentTimeMillis());
-        if (!fi.isBlacklisted()) {
-          if (shouldBlacklist(hostName, numFaults)) {
-            LOG.info("Adding " + hostName + " to the blacklist" +
-                     " across all jobs");
-            removeHostCapacity(hostName);
-            fi.setBlacklist(true);
-          }
+        if (exceedsFaults(fi)) {
+          LOG.info("Adding " + hostName + " to the blacklist"
+              + " across all jobs");
+          String reason = String.format(FaultInfo.FAULT_FORMAT_STRING,
+              numFaults);
+          blackListTracker(hostName, reason,
+              ReasonForBlackListing.EXCEEDING_FAILURES);
         }
       }        
     }
 
+    private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
+      FaultInfo fi = getFaultInfo(hostName, true);
+      boolean blackListed = fi.isBlacklisted();
+      if(blackListed) {
+        if(fi.getReasonforblacklisting().contains(rfb)) {
+          return;
+        } else {
+          LOG.info("Adding blacklisted reason for tracker : " + hostName 
+              + " Reason for blacklisting is : " + rfb);
+          fi.addBlackListedReason(rfb, reason);
+        }
+        return;
+      } else {
+        LOG.info("Blacklisting tracker : " + hostName 
+            + " Reason for blacklisting is : " + rfb);
+        removeHostCapacity(hostName);
+        fi.setBlacklist(rfb, reason);
+      }
+    }
+    
+    private boolean canUnBlackListTracker(String hostName,
+        ReasonForBlackListing rfb) {
+      FaultInfo fi = getFaultInfo(hostName, false);
+      if(fi == null) {
+        return false;
+      }
+      
+      Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting();
+      return fi.isBlacklisted() && rfbSet.contains(rfb);
+    }
+
+    private void unBlackListTracker(String hostName,
+        ReasonForBlackListing rfb) {
+      // check if you can black list the tracker then call this methods
+      FaultInfo fi = getFaultInfo(hostName, false);
+      if(fi.removeBlackListedReason(rfb)) {
+        if(fi.getReasonforblacklisting().isEmpty()) {
+          addHostCapacity(hostName);
+          LOG.info("Unblacklisting tracker : " + hostName);
+          fi.unBlacklist();
+          //We have unBlackListed tracker, so tracker should
+          //definitely be healthy. Check fault count if fault count
+          //is zero don't keep it memory.
+          if(fi.numFaults == 0) {
+            potentiallyFaultyTrackers.remove(hostName);
+          }
+        }
+      }
+    }
+    
+    private FaultInfo getFaultInfo(String hostName, 
+        boolean createIfNeccessary) {
+      FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+      if (fi == null && createIfNeccessary) {
+        fi = new FaultInfo();
+        potentiallyFaultyTrackers.put(hostName, fi);
+      }
+      return fi;
+    }
+    
     /**
      * Blacklists the tracker across all jobs if
      * <ol>
@@ -570,9 +675,11 @@ public class JobTracker implements MRCon
      *     MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
      * <li>#faults is 50% (configurable) above the average #faults</li>
      * <li>50% the cluster is not blacklisted yet </li>
+     * </ol>
      */
-    private boolean shouldBlacklist(String hostName, int numFaults) {
-      if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
+    private boolean exceedsFaults(FaultInfo fi) {
+      int faultCount = fi.getFaultCount();
+      if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) {
         // calculate avgBlackLists
         long clusterSize = getClusterStatus().getTaskTrackers();
         long sum = 0;
@@ -582,7 +689,7 @@ public class JobTracker implements MRCon
         double avg = (double) sum / clusterSize;
             
         long totalCluster = clusterSize + numBlacklistedTrackers;
-        if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+        if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
             numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
           return true;
         }
@@ -625,16 +732,12 @@ public class JobTracker implements MRCon
         if (fi != null &&
             (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
           int numFaults = fi.getFaultCount() - 1;
-          if (fi.isBlacklisted()) {
-            LOG.info("Removing " + hostName + " from blacklist");
-            addHostCapacity(hostName);
-            fi.setBlacklist(false);
-          }
-          if (numFaults > 0) {
-            fi.setFaultCount(numFaults);
-            fi.setLastUpdated(now);
-          } else {
-            potentiallyFaultyTrackers.remove(hostName);
+          fi.setFaultCount(numFaults);
+          fi.setLastUpdated(now);
+          if (canUnBlackListTracker(hostName, 
+              ReasonForBlackListing.EXCEEDING_FAILURES)) {
+            unBlackListTracker(hostName,
+                ReasonForBlackListing.EXCEEDING_FAILURES);
           }
         }
         return (fi != null && fi.isBlacklisted());
@@ -703,6 +806,41 @@ public class JobTracker implements MRCon
       }
       return 0;
     }
+    
+    Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
+      synchronized (potentiallyFaultyTrackers) {
+        FaultInfo fi = null;
+        if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+          return fi.getReasonforblacklisting();
+        }
+      }
+      return null;
+    }
+
+
+    void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
+      FaultInfo fi = null;
+      // If tracker is not healthy, create a fault info object
+      // blacklist it.
+      if (!isHealthy) {
+        fi = getFaultInfo(hostName, true);
+        fi.setHealthy(isHealthy);
+        synchronized (potentiallyFaultyTrackers) {
+          blackListTracker(hostName, reason,
+              ReasonForBlackListing.NODE_UNHEALTHY);
+        }
+      } else {
+        fi = getFaultInfo(hostName, false);
+        if (fi == null) {
+          return;
+        } else {
+          if (canUnBlackListTracker(hostName,
+              ReasonForBlackListing.NODE_UNHEALTHY)) {
+            unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY);
+          }
+        }
+      }
+    }
   }
   
   /**
@@ -2728,7 +2866,7 @@ public class JobTracker implements MRCon
     // Initialize the response to be sent for the heartbeat
     HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
     List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
-      
+    isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
     // Check for new tasks to be executed on the tasktracker
     if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
       TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
@@ -2930,6 +3068,15 @@ public class JobTracker implements MRCon
     getInstrumentation().setReduceSlots(totalReduceTaskCapacity);
     return oldStatus != null;
   }
+  
+  
+  private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
+    TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
+    synchronized (faultyTrackers) {
+      faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(), 
+          status.isNodeHealthy(), status.getHealthReport());
+    }
+  }
     
   /**
    * Process incoming heartbeat messages from the task trackers.
@@ -2971,6 +3118,7 @@ public class JobTracker implements MRCon
     }
 
     updateTaskStatuses(trackerStatus);
+    updateNodeHealthStatus(trackerStatus);
     
     return true;
   }
@@ -4222,4 +4370,25 @@ public class JobTracker implements MRCon
         UserGroupInformation.getCurrentUGI().getUserName());
     this.queueManager.refreshAcls(new Configuration(this.conf));
   }
+  
+  String getReasonsForBlacklisting(String host) {
+    FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+    if (fi == null) {
+      return "";
+    }
+    return fi.getTrackerFaultReport();
+  }
+
+  /** Test Methods */
+  Set<ReasonForBlackListing> getReasonForBlackList(String host) {
+    FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+    if (fi == null) {
+      return new HashSet<ReasonForBlackListing>();
+    }
+    return fi.getReasonforblacklisting();
+  }
+  
+  void incrementFaults(String hostName) {
+    faultyTrackers.incrementFaults(hostName);
+  }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java?rev=1076950&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java Fri Mar  4 03:25:16 2011
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ * 
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+class NodeHealthCheckerService {
+
+  private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
+
+  /** Absolute path to the health script. */
+  private String nodeHealthScript;
+  /** Delay after which node health script to be executed */
+  private long intervalTime;
+  /** Time after which the script should be timedout */
+  private long scriptTimeout;
+  /** Timer used to schedule node health monitoring script execution */
+  private Timer nodeHealthScriptScheduler;
+
+  /** ShellCommandExecutor used to execute monitoring script */
+  ShellCommandExecutor shexec = null;
+
+  /** Configuration used by the checker */
+  private Configuration conf;
+
+  /** Pattern used for searching in the output of the node health script */
+  static private final String ERROR_PATTERN = "ERROR";
+
+  /* Configuration keys */
+  static final String HEALTH_CHECK_SCRIPT_PROPERTY = "mapred.healthChecker.script.path";
+
+  static final String HEALTH_CHECK_INTERVAL_PROPERTY = "mapred.healthChecker.interval";
+
+  static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = "mapred.healthChecker.script.timeout";
+
+  static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = "mapred.healthChecker.script.args";
+  /* end of configuration keys */
+
+  /** Default frequency of running node health script */
+  private static final long DEFAULT_HEALTH_CHECK_INTERVAL = 10 * 60 * 1000;
+  /** Default script time out period */
+  private static final long DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL = 2 * DEFAULT_HEALTH_CHECK_INTERVAL;
+
+  private boolean isHealthy;
+
+  private String healthReport;
+
+  private long lastReportedTime;
+
+  private TimerTask timer;
+  
+  private enum HealthCheckerExitStatus {
+    SUCCESS,
+    TIMED_OUT,
+    FAILED_WITH_EXIT_CODE,
+    FAILED_WITH_EXCEPTION,
+    FAILED
+  }
+
+
+  /**
+   * Class which is used by the {@link Timer} class to periodically execute the
+   * node health script.
+   * 
+   */
+  private class NodeHealthMonitorExecutor extends TimerTask {
+
+    String exceptionStackTrace = "";
+
+    public NodeHealthMonitorExecutor(String[] args) {
+      ArrayList<String> execScript = new ArrayList<String>();
+      execScript.add(nodeHealthScript);
+      if (args != null) {
+        execScript.addAll(Arrays.asList(args));
+      }
+      shexec = new ShellCommandExecutor((String[]) execScript
+          .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+    }
+
+    @Override
+    public void run() {
+      HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+      try {
+        shexec.execute();
+      } catch (ExitCodeException e) {
+        // ignore the exit code of the script
+        status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+      } catch (Exception e) {
+        LOG.warn("Caught exception : " + e.getMessage());
+        status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+        exceptionStackTrace = StringUtils.stringifyException(e);
+      } finally {
+        if (status == HealthCheckerExitStatus.SUCCESS) {
+          if (hasErrors(shexec.getOutput())) {
+            status = HealthCheckerExitStatus.FAILED;
+          }
+        }
+        reportHealthStatus(status);
+      }
+    }
+
+    /**
+     * Method which is used to parse output from the node health monitor and
+     * send to the report address.
+     * 
+     * The timed out script or script which causes IOException output is
+     * ignored.
+     * 
+     * The node is marked unhealthy if
+     * <ol>
+     * <li>The node health script times out</li>
+     * <li>The node health scripts output has a line which begins with ERROR</li>
+     * <li>An exception is thrown while executing the script</li>
+     * </ol>
+     * If the script throws {@link IOException} or {@link ExitCodeException} the
+     * output is ignored and node is left remaining healthy, as script might
+     * have syntax error.
+     * 
+     * @param status
+     */
+    void reportHealthStatus(HealthCheckerExitStatus status) {
+      long now = System.currentTimeMillis();
+      switch (status) {
+      case SUCCESS:
+        setHealthStatus(true, "", now);
+        break;
+      case TIMED_OUT:
+        setHealthStatus(false, "Node health script timed out");
+        break;
+      case FAILED_WITH_EXCEPTION:
+        setHealthStatus(false, exceptionStackTrace);
+        break;
+      case FAILED_WITH_EXIT_CODE:
+        setHealthStatus(true, "", now);
+        break;
+      case FAILED:
+        setHealthStatus(false, shexec.getOutput());
+        break;
+      }
+    }
+
+    /**
+     * Method to check if the output string has line which begins with ERROR.
+     * 
+     * @param output
+     *          string
+     * @return true if output string has error pattern in it.
+     */
+    private boolean hasErrors(String output) {
+      String[] splits = output.split("\n");
+      for (String split : splits) {
+        if (split.startsWith(ERROR_PATTERN)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  public NodeHealthCheckerService(Configuration conf) {
+    this.conf = conf;
+    this.lastReportedTime = System.currentTimeMillis();
+    this.isHealthy = true;
+    this.healthReport = "";    
+    initialize(conf);
+  }
+
+  /*
+   * Method which initializes the values for the script path and interval time.
+   */
+  private void initialize(Configuration conf) {
+    this.nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_CHECK_INTERVAL);
+    this.scriptTimeout = conf.getLong(HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
+    String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+        new String[] {});
+    timer = new NodeHealthMonitorExecutor(args);
+  }
+
+  /**
+   * Method used to start the Node health monitoring.
+   * 
+   */
+  void start() {
+    // if health script path is not configured don't start the thread.
+    if (!shouldRun(conf)) {
+      LOG.info("Not starting node health monitor");
+      return;
+    }
+    nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+    // Start the timer task immediately and
+    // then periodically at interval time.
+    nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+  }
+
+  /**
+   * Method used to terminate the node health monitoring service.
+   * 
+   */
+  void stop() {
+    if (!shouldRun(conf)) {
+      return;
+    }
+    nodeHealthScriptScheduler.cancel();
+    if (shexec != null) {
+      Process p = shexec.getProcess();
+      if (p != null) {
+        p.destroy();
+      }
+    }
+  }
+
+  /**
+   * Gets the if the node is healthy or not
+   * 
+   * @return true if node is healthy
+   */
+  private boolean isHealthy() {
+    return isHealthy;
+  }
+
+  /**
+   * Sets if the node is healhty or not.
+   * 
+   * @param isHealthy
+   *          if or not node is healthy
+   */
+  private synchronized void setHealthy(boolean isHealthy) {
+    this.isHealthy = isHealthy;
+  }
+
+  /**
+   * Returns output from health script. if node is healthy then an empty string
+   * is returned.
+   * 
+   * @return output from health script
+   */
+  private String getHealthReport() {
+    return healthReport;
+  }
+
+  /**
+   * Sets the health report from the node health script.
+   * 
+   * @param healthReport
+   */
+  private synchronized void setHealthReport(String healthReport) {
+    this.healthReport = healthReport;
+  }
+  
+  /**
+   * Returns time stamp when node health script was last run.
+   * 
+   * @return timestamp when node health script was last run
+   */
+  private long getLastReportedTime() {
+    return lastReportedTime;
+  }
+
+  /**
+   * Sets the last run time of the node health script.
+   * 
+   * @param lastReportedTime
+   */
+  private synchronized void setLastReportedTime(long lastReportedTime) {
+    this.lastReportedTime = lastReportedTime;
+  }
+
+  /**
+   * Method used to determine if or not node health monitoring service should be
+   * started or not. Returns true if following conditions are met:
+   * 
+   * <ol>
+   * <li>Path to Node health check script is not empty</li>
+   * <li>Node health check script file exists</li>
+   * </ol>
+   * 
+   * @param conf
+   * @return true if node health monitoring service can be started.
+   */
+  static boolean shouldRun(Configuration conf) {
+    String nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+      return false;
+    }
+    File f = new File(nodeHealthScript);
+    return f.exists() && f.canExecute();
+  }
+
+  private synchronized void setHealthStatus(boolean isHealthy, String output) {
+    this.setHealthy(isHealthy);
+    this.setHealthReport(output);
+  }
+  
+  private synchronized void setHealthStatus(boolean isHealthy, String output,
+      long time) {
+    this.setHealthStatus(isHealthy, output);
+    this.setLastReportedTime(time);
+  }
+  
+  /**
+   * Method to populate the fields for the {@link TaskTrackerHealthStatus}
+   * 
+   * @param healthStatus
+   */
+  synchronized void setHealthStatus(TaskTrackerHealthStatus healthStatus) {
+    healthStatus.setNodeHealthy(this.isHealthy());
+    healthStatus.setHealthReport(this.getHealthReport());
+    healthStatus.setLastReported(this.getLastReportedTime());
+  }
+  
+  /**
+   * Test method to directly access the timer which node 
+   * health checker would use.
+   * 
+   *
+   * @return Timer task
+   */
+  //XXX:Not to be used directly.
+  TimerTask getTimer() {
+    return timer;
+  }
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar  4 03:25:16 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.ipc.RemoteException;
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.metrics.MetricsContext;
@@ -237,6 +238,11 @@ public class TaskTracker 
   */
   private TaskController taskController;
   
+  /**
+   * Handle to the specific instance of the {@link NodeHealthCheckerService}
+   */
+  private NodeHealthCheckerService healthChecker;
+  
   /*
    * A list of commitTaskActions for whom commit response has been received 
    */
@@ -554,6 +560,11 @@ public class TaskTracker 
     
     //setup and create jobcache directory with appropriate permissions
     taskController.setup();
+    
+    //Start up node health checker service.
+    if (shouldStartHealthMonitor(this.fConf)) {
+      startHealthMonitor(this.fConf);
+    }
   }
 
   public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -913,6 +924,11 @@ public class TaskTracker 
       taskReportServer.stop();
       taskReportServer = null;
     }
+    if (healthChecker != null) {
+      //stop node health checker service
+      healthChecker.stop();
+      healthChecker = null;
+    }
   }
 
   /**
@@ -1231,7 +1247,18 @@ public class TaskTracker 
       status.getResourceStatus().setReduceSlotMemorySizeOnTT(
           reduceSlotSizeMemoryOnTT);
     }
-      
+    //add node health information
+    
+    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+    synchronized (this) {
+      if (healthChecker != null) {
+        healthChecker.setHealthStatus(healthStatus);
+      } else {
+        healthStatus.setNodeHealthy(true);
+        healthStatus.setLastReported(0L);
+        healthStatus.setHealthReport("");
+      }
+    }
     //
     // Xmit the heartbeat
     //
@@ -3199,4 +3226,24 @@ public class TaskTracker 
       }
     }
   }
+  
+  /**
+   * Wrapper method used by TaskTracker to check if {@link  NodeHealthCheckerService}
+   * can be started
+   * @param conf configuration used to check if service can be started
+   * @return true if service can be started
+   */
+  private boolean shouldStartHealthMonitor(Configuration conf) {
+    return NodeHealthCheckerService.shouldRun(conf);
+  }
+  
+  /**
+   * Wrapper method used to start {@link NodeHealthCheckerService} for 
+   * Task Tracker
+   * @param conf Configuration used by the service.
+   */
+  private void startHealthMonitor(Configuration conf) {
+    healthChecker = new NodeHealthCheckerService(conf);
+    healthChecker.start();
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar  4 03:25:16 2011
@@ -53,6 +53,7 @@ public class TaskTrackerStatus implement
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
+  private TaskTrackerHealthStatus healthStatus;
    
   /**
    * Class representing a collection of resources on this tasktracker.
@@ -196,6 +197,7 @@ public class TaskTrackerStatus implement
   public TaskTrackerStatus() {
     taskReports = new ArrayList<TaskStatus>();
     resStatus = new ResourceStatus();
+    this.healthStatus = new TaskTrackerHealthStatus();
   }
 
   /**
@@ -213,6 +215,7 @@ public class TaskTrackerStatus implement
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
     this.resStatus = new ResourceStatus();
+    this.healthStatus = new TaskTrackerHealthStatus();
   }
 
   /**
@@ -377,13 +380,128 @@ public class TaskTrackerStatus implement
   ResourceStatus getResourceStatus() {
     return resStatus;
   }
+
+  /**
+   * Returns health status of the task tracker.
+   * @return health status of Task Tracker
+   */
+  public TaskTrackerHealthStatus getHealthStatus() {
+    return healthStatus;
+  }
+
+  /**
+   * Static class which encapsulates the Node health
+   * related fields.
+   * 
+   */
+  /**
+   * Static class which encapsulates the Node health
+   * related fields.
+   * 
+   */
+  static class TaskTrackerHealthStatus implements Writable {
+    
+    private boolean isNodeHealthy;
+    
+    private String healthReport;
+    
+    private long lastReported;
+    
+    public TaskTrackerHealthStatus(boolean isNodeHealthy, String healthReport,
+        long lastReported) {
+      this.isNodeHealthy = isNodeHealthy;
+      this.healthReport = healthReport;
+      this.lastReported = lastReported;
+    }
+    
+    public TaskTrackerHealthStatus() {
+      this.isNodeHealthy = true;
+      this.healthReport = "";
+      this.lastReported = System.currentTimeMillis();
+    }
+
+    /**
+     * Sets whether or not a task tracker is healthy or not, based on the
+     * output from the node health script.
+     * 
+     * @param isNodeHealthy
+     */
+    void setNodeHealthy(boolean isNodeHealthy) {
+      this.isNodeHealthy = isNodeHealthy;
+    }
+
+    /**
+     * Returns if node is healthy or not based on result from node health
+     * script.
+     * 
+     * @return true if the node is healthy.
+     */
+    boolean isNodeHealthy() {
+      return isNodeHealthy;
+    }
+
+    /**
+     * Sets the health report based on the output from the health script.
+     * 
+     * @param healthReport
+     *          String listing cause of failure.
+     */
+    void setHealthReport(String healthReport) {
+      this.healthReport = healthReport;
+    }
+
+    /**
+     * Returns the health report of the node if any, The health report is
+     * only populated when the node is not healthy.
+     * 
+     * @return health report of the node if any
+     */
+    String getHealthReport() {
+      return healthReport;
+    }
+
+    /**
+     * Sets when the TT got its health information last 
+     * from node health monitoring service.
+     * 
+     * @param lastReported last reported time by node 
+     * health script
+     */
+    public void setLastReported(long lastReported) {
+      this.lastReported = lastReported;
+    }
+
+    /**
+     * Gets time of most recent node health update.
+     * 
+     * @return time stamp of most recent health update.
+     */
+    public long getLastReported() {
+      return lastReported;
+    }
+
+    @Override
+    public void readFields(DataInput in) throws IOException {
+      isNodeHealthy = in.readBoolean();
+      healthReport = Text.readString(in);
+      lastReported = in.readLong();
+    }
+    
+    @Override
+    public void write(DataOutput out) throws IOException {
+      out.writeBoolean(isNodeHealthy);
+      Text.writeString(out, healthReport);
+      out.writeLong(lastReported);
+    }
+    
+  }
   
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////
   public void write(DataOutput out) throws IOException {
-    UTF8.writeString(out, trackerName);
-    UTF8.writeString(out, host);
+    Text.writeString(out, trackerName);
+    Text.writeString(out, host);
     out.writeInt(httpPort);
     out.writeInt(failures);
     out.writeInt(maxMapTasks);
@@ -394,11 +512,12 @@ public class TaskTrackerStatus implement
     for (TaskStatus taskStatus : taskReports) {
       TaskStatus.writeTaskStatus(out, taskStatus);
     }
+    getHealthStatus().write(out);
   }
 
   public void readFields(DataInput in) throws IOException {
-    this.trackerName = UTF8.readString(in);
-    this.host = UTF8.readString(in);
+    this.trackerName = Text.readString(in);
+    this.host = Text.readString(in);
     this.httpPort = in.readInt();
     this.failures = in.readInt();
     this.maxMapTasks = in.readInt();
@@ -410,5 +529,6 @@ public class TaskTrackerStatus implement
     for (int i = 0; i < numTasks; i++) {
       taskReports.add(TaskStatus.readTaskStatus(in));
     }
+    getHealthStatus().readFields(in);
   }
 }

Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java?rev=1076950&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java Fri Mar  4 03:25:16 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+
+import junit.framework.TestCase;
+
+public class TestNodeHealthService extends TestCase {
+
+  private static volatile Log LOG = LogFactory
+      .getLog(TestNodeHealthService.class);
+
+  private static final String nodeHealthConfigPath = System.getProperty(
+      "test.build.extraconf", "build/test/extraconf");
+
+  final static File nodeHealthConfigFile = new File(nodeHealthConfigPath,
+      "mapred-site.xml");
+
+  private String testRootDir = new File(System.getProperty("test.build.data",
+      "/tmp")).getAbsolutePath();
+
+  private File nodeHealthscriptFile = new File(testRootDir, "failingscript.sh");
+
+  @Override
+  protected void tearDown() throws Exception {
+    if (nodeHealthConfigFile.exists()) {
+      nodeHealthConfigFile.delete();
+    }
+    if (nodeHealthscriptFile.exists()) {
+      nodeHealthscriptFile.delete();
+    }
+    super.tearDown();
+  }
+
+  private Configuration getConfForNodeHealthScript() {
+    Configuration conf = new Configuration();
+    conf.set(NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY,
+        nodeHealthscriptFile.getAbsolutePath());
+    conf.setLong(NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY, 500);
+    conf.setLong(
+        NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY, 1000);
+    return conf;
+  }
+
+  private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
+      throws IOException {
+    PrintWriter pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
+    pw.println(scriptStr);
+    pw.flush();
+    pw.close();
+    nodeHealthscriptFile.setExecutable(setExecutable);
+  }
+
+  public void testNodeHealthScriptShouldRun() throws IOException {
+    // Node health script should not start if there is no property called
+    // node health script path.
+    assertFalse("Health checker should not have started",
+        NodeHealthCheckerService.shouldRun(new Configuration()));
+    Configuration conf = getConfForNodeHealthScript();
+    // Node health script should not start if the node health script does not
+    // exists
+    assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    // Create script path.
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+    writeNodeHealthScriptFile("", false);
+    // Node health script should not start if the node health script is not
+    // executable.
+    assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    writeNodeHealthScriptFile("", true);
+    assertTrue("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+  }
+
+  public void testNodeHealthScript() throws Exception {
+    TaskTrackerHealthStatus healthStatus = new TaskTrackerHealthStatus();
+    String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
+    String normalScript = "echo \"I am all fine\"";
+    String timeOutScript = "sleep 4\n echo\"I am fine\"";
+    Configuration conf = getConfForNodeHealthScript();
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+
+    NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
+        conf);
+    TimerTask timer = nodeHealthChecker.getTimer();
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking initial healthy condition");
+    // Check proper report conditions.
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // write out error file.
+    // Healthy to unhealthy transition
+    writeNodeHealthScriptFile(errorScript, true);
+    // Run timer
+    timer.run();
+    // update health status
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->Unhealthy");
+    assertFalse("Node health status reported healthy", healthStatus
+        .isNodeHealthy());
+    assertFalse("Node health status reported healthy", healthStatus
+        .getHealthReport().isEmpty());
+    
+    // Check unhealthy to healthy transitions.
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking UnHealthy--->healthy");
+    // Check proper report conditions.
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // Healthy to timeout transition.
+    writeNodeHealthScriptFile(timeOutScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->timeout");
+    assertFalse("Node health status reported healthy even after timeout",
+        healthStatus.isNodeHealthy());
+    assertFalse("Node health status reported healthy even after timeout",
+        healthStatus.getHealthReport().isEmpty());
+  }
+
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java Fri Mar  4 03:25:16 2011
@@ -20,7 +20,10 @@ package org.apache.hadoop.util;
 import junit.framework.TestCase;
 
 import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
+import java.io.PrintWriter;
 
 public class TestShell extends TestCase {
 
@@ -71,6 +74,27 @@ public class TestShell extends TestCase 
     assertInString(command, " .. ");
     assertInString(command, "\"arg 2\"");
   }
+  
+  public void testShellCommandTimeout() throws Throwable {
+    String rootDir = new File(System.getProperty(
+        "test.build.data", "/tmp")).getAbsolutePath();
+    File shellFile = new File(rootDir, "timeout.sh");
+    String timeoutCommand = "sleep 4; echo \"hello\"";
+    PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+    writer.println(timeoutCommand);
+    writer.close();
+    shellFile.setExecutable(true);
+    Shell.ShellCommandExecutor shexc 
+    = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()},
+                                      null, null, 100);
+    try {
+      shexc.execute();
+    } catch (Exception e) {
+      //When timing out exception is thrown.
+    }
+    shellFile.delete();
+    assertTrue("Script didnt not timeout" , shexc.isTimedOut());
+  }
 
   private void testInterval(long interval) throws IOException {
     Command command = new Command(interval);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp Fri Mar  4 03:25:16 2011
@@ -29,23 +29,43 @@
       out.println("<h2>Task Trackers</h2>");
       c = tracker.taskTrackers();
     }
+    int noCols = 9;
+    if(type.equals("blacklisted")) {
+      noCols = 10;
+    }
     if (c.size() == 0) {
       out.print("There are currently no known " + type + " Task Trackers.");
     } else {
       out.print("<center>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
-      out.print("<tr><td align=\"center\" colspan=\"6\"><b>Task Trackers</b></td></tr>\n");
+      out.print("<tr><td align=\"center\" colspan=\""+ noCols +"\"><b>Task Trackers</b></td></tr>\n");
       out.print("<tr><td><b>Name</b></td><td><b>Host</b></td>" +
                 "<td><b># running tasks</b></td>" +
                 "<td><b>Max Map Tasks</b></td>" +
                 "<td><b>Max Reduce Tasks</b></td>" +
                 "<td><b>Failures</b></td>" +
-                "<td><b>Seconds since heartbeat</b></td></tr>\n");
+                "<td><b>Node Health Status</b></td>" +
+                "<td><b>Seconds Since Node Last Healthy</b></td>");
+      if(type.equals("blacklisted")) {
+      	out.print("<td><b>Reason For blacklisting</b></td>");
+      }
+      out.print("<td><b>Seconds since heartbeat</b></td></tr>\n");
+
       int maxFailures = 0;
       String failureKing = null;
       for (Iterator it = c.iterator(); it.hasNext(); ) {
         TaskTrackerStatus tt = (TaskTrackerStatus) it.next();
         long sinceHeartbeat = System.currentTimeMillis() - tt.getLastSeen();
+        boolean isHealthy = tt.getHealthStatus().isNodeHealthy();
+        long sinceHealthCheck = tt.getHealthStatus().getLastReported();
+        String healthString = "";
+        if(sinceHealthCheck == 0) {
+          healthString = "N/A";
+        } else {
+          healthString = (isHealthy?"Healthy":"Unhealthy");
+          sinceHealthCheck = System.currentTimeMillis() - sinceHealthCheck;
+          sinceHealthCheck =  sinceHealthCheck/1000;
+        }
         if (sinceHeartbeat > 0) {
           sinceHeartbeat = sinceHeartbeat / 1000;
         }
@@ -65,8 +85,13 @@
         out.print(tt.getHost() + "</td><td>" + numCurTasks +
                   "</td><td>" + tt.getMaxMapSlots() +
                   "</td><td>" + tt.getMaxReduceSlots() + 
-                  "</td><td>" + numFailures + 
-                  "</td><td>" + sinceHeartbeat + "</td></tr>\n");
+                  "</td><td>" + numFailures +
+                  "</td><td>" + healthString +
+                  "</td><td>" + sinceHealthCheck); 
+        if(type.equals("blacklisted")) {
+          out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
+        }
+        out.print("</td><td>" + sinceHeartbeat + "</td></tr>\n");
       }
       out.print("</table>\n");
       out.print("</center>\n");