You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/03/28 06:28:11 UTC

svn commit: r1086116 - in /hadoop/mapreduce/branches/MR-279: ./ mr-client/hadoop-mapreduce-client-jobclient/ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/server/tasktracker/ src/java/org/apache/hadoop/mapreduce/util/ src/test...

Author: vinodkv
Date: Mon Mar 28 04:28:10 2011
New Revision: 1086116

URL: http://svn.apache.org/viewvc?rev=1086116&view=rev
Log:
Implementing health-checks for the NodeManager. Moving NodeHealthCheckerService from mapred into yarn-server-common. Contributed by Vinod Kumar Vavilapalli.

Added:
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
Removed:
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java
    hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java
Modified:
    hadoop/mapreduce/branches/MR-279/ivy.xml
    hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
    hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java
    hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml
    hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
    hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java

Modified: hadoop/mapreduce/branches/MR-279/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy.xml (original)
+++ hadoop/mapreduce/branches/MR-279/ivy.xml Mon Mar 28 04:28:10 2011
@@ -74,6 +74,8 @@
 
    <dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core" 
                rev="${hadoop-mapreduce-client-core.version}" conf="common->default"/> 
+   <dependency org="org.apache.hadoop" name="yarn-server-common" 
+               rev="${yarn.version}" conf="common->default"/> 
 
    <dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
                conf="jdiff->default"/>

Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml Mon Mar 28 04:28:10 2011
@@ -79,17 +79,4 @@
       <scope>test</scope>
     </dependency>
   </dependencies>
-  <build>
-    <plugins>
-      <plugin>
-        <groupId>org.apache.maven.plugins</groupId>
-        <artifactId>maven-surefire-plugin</artifactId>
-        <configuration>
-          <excludes>
-            <!--exclude>**/TestYARNClient.java</exclude-->
-          </excludes>
-        </configuration>
-      </plugin>
-    </plugins>
-  </build>
 </project>

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Mar 28 04:28:10 2011
@@ -55,6 +55,7 @@ import javax.security.auth.login.LoginEx
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -76,7 +77,6 @@ import org.apache.hadoop.mapred.ClusterS
 import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
 import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
 import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.ClusterMetrics;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.QueueInfo;
@@ -2695,7 +2695,7 @@ public class JobTracker implements MRCon
   }
   
   private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
-    TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
+    NodeHealthStatus status = trackerStatus.getHealthStatus();
     synchronized (faultyTrackers) {
       faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(), 
           status.isNodeHealthy(), status.getHealthReport());

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 28 04:28:10 2011
@@ -53,6 +53,8 @@ import javax.servlet.http.HttpServletRes
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.conf.Configuration;
@@ -73,7 +75,6 @@ import org.apache.hadoop.ipc.RemoteExcep
 import org.apache.hadoop.ipc.Server;
 import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
 import org.apache.hadoop.mapred.TaskController.DeletionContext;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapred.pipes.Submitter;
 import org.apache.hadoop.mapreduce.MRConfig;
 import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -1724,7 +1725,7 @@ public class TaskTracker 
     }
     //add node health information
     
-    TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+    NodeHealthStatus healthStatus = status.getHealthStatus();
     synchronized (this) {
       if (healthChecker != null) {
         healthChecker.setHealthStatus(healthStatus);

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Mar 28 04:28:10 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.classification.InterfaceStability;
 import org.apache.hadoop.io.*;
@@ -54,7 +55,7 @@ public class TaskTrackerStatus implement
   volatile long lastSeen;
   private int maxMapTasks;
   private int maxReduceTasks;
-  private TaskTrackerHealthStatus healthStatus;
+  private NodeHealthStatus healthStatus;
    
   public static final int UNAVAILABLE = -1;
   /**
@@ -345,7 +346,7 @@ public class TaskTrackerStatus implement
   public TaskTrackerStatus() {
     taskReports = new ArrayList<TaskStatus>();
     resStatus = new ResourceStatus();
-    this.healthStatus = new TaskTrackerHealthStatus();
+    this.healthStatus = new NodeHealthStatus();
   }
   
   public TaskTrackerStatus(String trackerName, String host) {
@@ -369,7 +370,7 @@ public class TaskTrackerStatus implement
     this.maxMapTasks = maxMapTasks;
     this.maxReduceTasks = maxReduceTasks;
     this.resStatus = new ResourceStatus();
-    this.healthStatus = new TaskTrackerHealthStatus();
+    this.healthStatus = new NodeHealthStatus();
   }
 
   /**
@@ -539,117 +540,10 @@ public class TaskTrackerStatus implement
    * Returns health status of the task tracker.
    * @return health status of Task Tracker
    */
-  public TaskTrackerHealthStatus getHealthStatus() {
+  public NodeHealthStatus getHealthStatus() {
     return healthStatus;
   }
 
-  /**
-   * Static class which encapsulates the Node health
-   * related fields.
-   * 
-   */
-  /**
-   * Static class which encapsulates the Node health
-   * related fields.
-   * 
-   */
-  static class TaskTrackerHealthStatus implements Writable {
-    
-    private boolean isNodeHealthy;
-    
-    private String healthReport;
-    
-    private long lastReported;
-    
-    public TaskTrackerHealthStatus(boolean isNodeHealthy, String healthReport,
-        long lastReported) {
-      this.isNodeHealthy = isNodeHealthy;
-      this.healthReport = healthReport;
-      this.lastReported = lastReported;
-    }
-    
-    public TaskTrackerHealthStatus() {
-      this.isNodeHealthy = true;
-      this.healthReport = "";
-      this.lastReported = System.currentTimeMillis();
-    }
-
-    /**
-     * Sets whether or not a task tracker is healthy or not, based on the
-     * output from the node health script.
-     * 
-     * @param isNodeHealthy
-     */
-    void setNodeHealthy(boolean isNodeHealthy) {
-      this.isNodeHealthy = isNodeHealthy;
-    }
-
-    /**
-     * Returns if node is healthy or not based on result from node health
-     * script.
-     * 
-     * @return true if the node is healthy.
-     */
-    boolean isNodeHealthy() {
-      return isNodeHealthy;
-    }
-
-    /**
-     * Sets the health report based on the output from the health script.
-     * 
-     * @param healthReport
-     *          String listing cause of failure.
-     */
-    void setHealthReport(String healthReport) {
-      this.healthReport = healthReport;
-    }
-
-    /**
-     * Returns the health report of the node if any, The health report is
-     * only populated when the node is not healthy.
-     * 
-     * @return health report of the node if any
-     */
-    String getHealthReport() {
-      return healthReport;
-    }
-
-    /**
-     * Sets when the TT got its health information last 
-     * from node health monitoring service.
-     * 
-     * @param lastReported last reported time by node 
-     * health script
-     */
-    public void setLastReported(long lastReported) {
-      this.lastReported = lastReported;
-    }
-
-    /**
-     * Gets time of most recent node health update.
-     * 
-     * @return time stamp of most recent health update.
-     */
-    public long getLastReported() {
-      return lastReported;
-    }
-
-    @Override
-    public void readFields(DataInput in) throws IOException {
-      isNodeHealthy = in.readBoolean();
-      healthReport = Text.readString(in);
-      lastReported = in.readLong();
-    }
-    
-    @Override
-    public void write(DataOutput out) throws IOException {
-      out.writeBoolean(isNodeHealthy);
-      Text.writeString(out, healthReport);
-      out.writeLong(lastReported);
-    }
-    
-  }
-  
   ///////////////////////////////////////////
   // Writable
   ///////////////////////////////////////////

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Mon Mar 28 04:28:10 2011
@@ -30,15 +30,6 @@ import org.apache.hadoop.mapreduce.MRCon
 @InterfaceStability.Evolving
 public interface TTConfig extends MRConfig {
 
-  // Task-tracker configuration properties
-  public static final String TT_HEALTH_CHECKER_INTERVAL = 
-    "mapreduce.tasktracker.healthchecker.interval";
-  public static final String TT_HEALTH_CHECKER_SCRIPT_ARGS =
-    "mapreduce.tasktracker.healthchecker.script.args";
-  public static final String TT_HEALTH_CHECKER_SCRIPT_PATH =
-    "mapreduce.tasktracker.healthchecker.script.path";
-  public static final String TT_HEALTH_CHECKER_SCRIPT_TIMEOUT =
-    "mapreduce.tasktracker.healthchecker.script.timeout";
   public static final String TT_LOCAL_DIR_MINSPACE_KILL = 
     "mapreduce.tasktracker.local.dir.minspacekill";
   public static final String TT_LOCAL_DIR_MINSPACE_START = 

Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java Mon Mar 28 04:28:10 2011
@@ -8,6 +8,7 @@ import org.apache.hadoop.mapreduce.MRCon
 import org.apache.hadoop.mapreduce.MRJobConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.NodeHealthCheckerService;
 
 /**
  * Place holder for deprecated keys in the framework 
@@ -105,13 +106,24 @@ public class ServerConfigUtil {
     Configuration.addDeprecation("mapred.job.tracker.retire.jobs", 
       new String[] {JTConfig.JT_RETIREJOBS});
     Configuration.addDeprecation("mapred.healthChecker.interval", 
-      new String[] {TTConfig.TT_HEALTH_CHECKER_INTERVAL});
-    Configuration.addDeprecation("mapred.healthChecker.script.args", 
-      new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS});
-    Configuration.addDeprecation("mapred.healthChecker.script.path", 
-      new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH});
-    Configuration.addDeprecation("mapred.healthChecker.script.timeout", 
-      new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT});
+      new String[] {NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY});
+    Configuration
+        .addDeprecation(
+            "mapred.healthChecker.script.args",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY
+            });
+    Configuration
+        .addDeprecation(
+            "mapred.healthChecker.script.path",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY });
+    Configuration
+        .addDeprecation(
+            "mapred.healthChecker.script.timeout",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY
+            });
     Configuration.addDeprecation("mapred.local.dir.minspacekill", 
       new String[] {TTConfig.TT_LOCAL_DIR_MINSPACE_KILL});
     Configuration.addDeprecation("mapred.local.dir.minspacestart", 
@@ -148,5 +160,27 @@ public class ServerConfigUtil {
       new String[] {TTConfig.TT_LOCAL_CACHE_SIZE});
     Configuration.addDeprecation("tasktracker.contention.tracking", 
       new String[] {TTConfig.TT_CONTENTION_TRACKING});
+    Configuration
+        .addDeprecation(
+            "mapreduce.tasktracker.healthchecker.interval",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY });
+    Configuration
+        .addDeprecation(
+            "mapreduce.tasktracker.healthchecker.script.args",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY
+            });
+    Configuration
+        .addDeprecation(
+            "mapreduce.tasktracker.healthchecker.script.path",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY });
+    Configuration
+        .addDeprecation(
+            "mapreduce.tasktracker.healthchecker.script.timeout",
+            new String[] {
+                NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY
+            });
   }
 }

Modified: hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Mon Mar 28 04:28:10 2011
@@ -33,9 +33,9 @@ import junit.framework.Test;
 import junit.framework.TestCase;
 import junit.framework.TestSuite;
 
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
 import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -159,14 +159,14 @@ public class TestTaskTrackerBlacklisting
     return setup;
   }
 
-  private static void sendHeartBeat(TaskTrackerHealthStatus status, 
+  private static void sendHeartBeat(NodeHealthStatus status, 
                                     boolean initialContact) 
   throws IOException {
     for (String tracker : trackers) {
       TaskTrackerStatus tts = new TaskTrackerStatus(tracker, HostUtil
           .convertTrackerNameToHostName(tracker));
       if (status != null) {
-        TaskTrackerHealthStatus healthStatus = tts.getHealthStatus();
+        NodeHealthStatus healthStatus = tts.getHealthStatus();
         healthStatus.setNodeHealthy(status.isNodeHealthy());
         healthStatus.setHealthReport(status.getHealthReport());
         healthStatus.setLastReported(status.getLastReported());
@@ -193,7 +193,7 @@ public class TestTaskTrackerBlacklisting
   }
 
   public void testNodeHealthBlackListing() throws Exception {
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
     //Blacklist tracker due to node health failures.
     sendHeartBeat(status, false);
     for (String host : hosts) {
@@ -227,7 +227,7 @@ public class TestTaskTrackerBlacklisting
           failureCount, getFailureCountSinceStart(jobTracker, tracker));
     }
     
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
     sendHeartBeat(status, false);
     //When the node fails due to health check, the statistics is 
     //incremented.
@@ -266,7 +266,7 @@ public class TestTaskTrackerBlacklisting
     assertEquals("Tracker 1 not blacklisted", 1,
         jobTracker.getBlacklistedTrackerCount());
     checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
     
     sendHeartBeat(status, false);
 
@@ -297,7 +297,7 @@ public class TestTaskTrackerBlacklisting
   public void testBlacklistingReasonString() throws Exception {
     String error = "ERROR";
     String error1 = "ERROR1";
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus(error);
+    NodeHealthStatus status = getUnhealthyNodeStatus(error);
     sendHeartBeat(status, false);
 
     assertEquals("All trackers not blacklisted", 3,
@@ -331,8 +331,8 @@ public class TestTaskTrackerBlacklisting
     sendHeartBeat(null, false);
   }
   
-  private TaskTrackerHealthStatus getUnhealthyNodeStatus(String error) {
-    TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+  private NodeHealthStatus getUnhealthyNodeStatus(String error) {
+    NodeHealthStatus status = new NodeHealthStatus();
     status.setNodeHealthy(false);
     status.setLastReported(System.currentTimeMillis());
     status.setHealthReport(error);
@@ -363,7 +363,7 @@ public class TestTaskTrackerBlacklisting
         .getBlacklistedTrackerCount());
     checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
     
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+    NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
     sendHeartBeat(status, false);
     assertEquals("All trackers not blacklisted", 3,
         jobTracker.getBlacklistedTrackerCount());
@@ -398,7 +398,7 @@ public class TestTaskTrackerBlacklisting
     assertTrue("The blacklisted tracker nodes is not empty.",
         blackListedTrackerInfo.isEmpty());
 
-    TaskTrackerHealthStatus status = getUnhealthyNodeStatus(errorWithNewLines);
+    NodeHealthStatus status = getUnhealthyNodeStatus(errorWithNewLines);
     // make all tracker unhealthy
     sendHeartBeat(status, false);
     assertEquals("All trackers not blacklisted", 3, jobTracker

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,10 @@
 <?xml version="1.0"?><project>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+
+
   <parent>
     <artifactId>yarn</artifactId>
     <groupId>org.apache.hadoop</groupId>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,9 @@
 <?xml version="1.0"?><project>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+
   <parent>
     <artifactId>yarn</artifactId>
     <groupId>org.apache.hadoop</groupId>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,9 @@
 <?xml version="1.0"?><project>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+
   <parent>
     <artifactId>yarn</artifactId>
     <groupId>org.apache.hadoop</groupId>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml Mon Mar 28 04:28:10 2011
@@ -8,6 +8,11 @@
   <groupId>org.apache.hadoop</groupId>
   <artifactId>yarn-server-common</artifactId>
   <name>yarn-server-common</name>
+
+  <properties>
+    <yarn.version>1.0-SNAPSHOT</yarn.version>
+  </properties>
+
   <version>${yarn.version}</version>
   <url>http://maven.apache.org</url>
 
@@ -18,7 +23,7 @@
       <version>${yarn.version}</version>
     </dependency>
     <dependency>
-      <groupId>org.apache.hadoop</groupId>
+      <groupId>org.apache.zookeeper</groupId>
       <artifactId>zookeeper</artifactId>
       <version>3.3.1</version>
       <scope>compile</scope>

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro Mon Mar 28 04:28:10 2011
@@ -13,6 +13,9 @@ protocol ResourceTracker {
 	int responseId;
 	long lastSeen;
     map<array<org.apache.hadoop.yarn.Container>> containers;
+    boolean isNodeHealthy;
+    union {string, null} healthReport;
+    long lastHealthReport;
   }
 
   record RegistrationResponse {

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ * 
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+public class NodeHealthCheckerService extends AbstractService {
+
+  private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
+
+  /** Absolute path to the health script. */
+  private String nodeHealthScript;
+  /** Delay after which node health script to be executed */
+  private long intervalTime;
+  /** Time after which the script should be timedout */
+  private long scriptTimeout;
+  /** Timer used to schedule node health monitoring script execution */
+  private Timer nodeHealthScriptScheduler;
+
+  /** ShellCommandExecutor used to execute monitoring script */
+  ShellCommandExecutor shexec = null;
+
+  /** Configuration used by the checker */
+  private Configuration conf;
+
+  /** Pattern used for searching in the output of the node health script */
+  static private final String ERROR_PATTERN = "ERROR";
+
+  /* Configuration keys */
+  public static final String HEALTH_CHECK_SCRIPT_PROPERTY = 
+    "yarn.server.nodemanager.healthchecker.script.path";
+
+  public static final String HEALTH_CHECK_INTERVAL_PROPERTY = 
+    "yarn.server.nodemanager.healthchecker.interval";
+
+  public static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = 
+    "yarn.server.nodemanager.healthchecker.script.timeout";
+
+  public static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = 
+    "yarn.server.nodemanager.healthchecker.script.args";
+  
+  /* end of configuration keys */
+  /** Time out error message */
+  static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";
+
+  /** Default frequency of running node health script */
+  private static final long DEFAULT_HEALTH_CHECK_INTERVAL = 10 * 60 * 1000;
+  /** Default script time out period */
+  private static final long DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL = 2 * DEFAULT_HEALTH_CHECK_INTERVAL;
+
+  private boolean isHealthy;
+
+  private String healthReport;
+
+  private long lastReportedTime;
+
+  private TimerTask timer;
+  
+  
+  private enum HealthCheckerExitStatus {
+    SUCCESS,
+    TIMED_OUT,
+    FAILED_WITH_EXIT_CODE,
+    FAILED_WITH_EXCEPTION,
+    FAILED
+  }
+
+
+  /**
+   * Class which is used by the {@link Timer} class to periodically execute the
+   * node health script.
+   * 
+   */
+  private class NodeHealthMonitorExecutor extends TimerTask {
+
+    String exceptionStackTrace = "";
+
+    public NodeHealthMonitorExecutor(String[] args) {
+      ArrayList<String> execScript = new ArrayList<String>();
+      execScript.add(nodeHealthScript);
+      if (args != null) {
+        execScript.addAll(Arrays.asList(args));
+      }
+      shexec = new ShellCommandExecutor(execScript
+          .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+    }
+
+    @Override
+    public void run() {
+      HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+      try {
+        shexec.execute();
+      } catch (ExitCodeException e) {
+        // ignore the exit code of the script
+        status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+      } catch (Exception e) {
+        LOG.warn("Caught exception : " + e.getMessage());
+        if (!shexec.isTimedOut()) {
+          status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+        } else {
+          status = HealthCheckerExitStatus.TIMED_OUT;
+        }
+        exceptionStackTrace = StringUtils.stringifyException(e);
+      } finally {
+        if (status == HealthCheckerExitStatus.SUCCESS) {
+          if (hasErrors(shexec.getOutput())) {
+            status = HealthCheckerExitStatus.FAILED;
+          }
+        }
+        reportHealthStatus(status);
+      }
+    }
+
+    /**
+     * Method which is used to parse output from the node health monitor and
+     * send to the report address.
+     * 
+     * The timed out script or script which causes IOException output is
+     * ignored.
+     * 
+     * The node is marked unhealthy if
+     * <ol>
+     * <li>The node health script times out</li>
+     * <li>The node health scripts output has a line which begins with ERROR</li>
+     * <li>An exception is thrown while executing the script</li>
+     * </ol>
+     * If the script throws {@link IOException} or {@link ExitCodeException} the
+     * output is ignored and node is left remaining healthy, as script might
+     * have syntax error.
+     * 
+     * @param status
+     */
+    void reportHealthStatus(HealthCheckerExitStatus status) {
+      long now = System.currentTimeMillis();
+      switch (status) {
+      case SUCCESS:
+        setHealthStatus(true, "", now);
+        break;
+      case TIMED_OUT:
+        setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
+        break;
+      case FAILED_WITH_EXCEPTION:
+        setHealthStatus(false, exceptionStackTrace);
+        break;
+      case FAILED_WITH_EXIT_CODE:
+        setHealthStatus(true, "", now);
+        break;
+      case FAILED:
+        setHealthStatus(false, shexec.getOutput());
+        break;
+      }
+    }
+
+    /**
+     * Method to check if the output string has line which begins with ERROR.
+     * 
+     * @param output
+     *          string
+     * @return true if output string has error pattern in it.
+     */
+    private boolean hasErrors(String output) {
+      String[] splits = output.split("\n");
+      for (String split : splits) {
+        if (split.startsWith(ERROR_PATTERN)) {
+          return true;
+        }
+      }
+      return false;
+    }
+  }
+
+  public NodeHealthCheckerService() {
+    super(NodeHealthCheckerService.class.getName());
+    this.lastReportedTime = System.currentTimeMillis();
+    this.isHealthy = true;
+    this.healthReport = "";    
+  }
+
+  public NodeHealthCheckerService(Configuration conf) {
+    this();
+    this.conf = conf;
+    init(conf);
+  }
+
+  /*
+   * Method which initializes the values for the script path and interval time.
+   */
+  @Override
+  public void init(Configuration conf) {
+    this.nodeHealthScript = 
+        conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_CHECK_INTERVAL);
+    this.scriptTimeout = conf.getLong(
+        HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+        DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
+    String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+        new String[] {});
+    timer = new NodeHealthMonitorExecutor(args);
+  }
+
+  /**
+   * Method used to start the Node health monitoring.
+   * 
+   */
+  @Override
+  public void start() {
+    // if health script path is not configured don't start the thread.
+    if (!shouldRun(conf)) {
+      LOG.info("Not starting node health monitor");
+      return;
+    }
+    nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+    // Start the timer task immediately and
+    // then periodically at interval time.
+    nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+  }
+
+  /**
+   * Method used to terminate the node health monitoring service.
+   * 
+   */
+  @Override
+  public void stop() {
+    if (!shouldRun(conf)) {
+      return;
+    }
+    nodeHealthScriptScheduler.cancel();
+    if (shexec != null) {
+      Process p = shexec.getProcess();
+      if (p != null) {
+        p.destroy();
+      }
+    }
+  }
+
+  /**
+   * Gets the if the node is healthy or not
+   * 
+   * @return true if node is healthy
+   */
+  private boolean isHealthy() {
+    return isHealthy;
+  }
+
+  /**
+   * Sets if the node is healhty or not.
+   * 
+   * @param isHealthy
+   *          if or not node is healthy
+   */
+  private synchronized void setHealthy(boolean isHealthy) {
+    this.isHealthy = isHealthy;
+  }
+
+  /**
+   * Returns output from health script. if node is healthy then an empty string
+   * is returned.
+   * 
+   * @return output from health script
+   */
+  private String getHealthReport() {
+    return healthReport;
+  }
+
+  /**
+   * Sets the health report from the node health script.
+   * 
+   * @param healthReport
+   */
+  private synchronized void setHealthReport(String healthReport) {
+    this.healthReport = healthReport;
+  }
+  
+  /**
+   * Returns time stamp when node health script was last run.
+   * 
+   * @return timestamp when node health script was last run
+   */
+  private long getLastReportedTime() {
+    return lastReportedTime;
+  }
+
+  /**
+   * Sets the last run time of the node health script.
+   * 
+   * @param lastReportedTime
+   */
+  private synchronized void setLastReportedTime(long lastReportedTime) {
+    this.lastReportedTime = lastReportedTime;
+  }
+
+  /**
+   * Method used to determine if or not node health monitoring service should be
+   * started or not. Returns true if following conditions are met:
+   * 
+   * <ol>
+   * <li>Path to Node health check script is not empty</li>
+   * <li>Node health check script file exists</li>
+   * </ol>
+   * 
+   * @param conf
+   * @return true if node health monitoring service can be started.
+   */
+  public static boolean shouldRun(Configuration conf) {
+    String nodeHealthScript = 
+      conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+    if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+      return false;
+    }
+    File f = new File(nodeHealthScript);
+    return f.exists() && f.canExecute();
+  }
+
+  private synchronized void setHealthStatus(boolean isHealthy, String output) {
+    this.setHealthy(isHealthy);
+    this.setHealthReport(output);
+  }
+  
+  private synchronized void setHealthStatus(boolean isHealthy, String output,
+      long time) {
+    this.setHealthStatus(isHealthy, output);
+    this.setLastReportedTime(time);
+  }
+  
+  /**
+   * Method to populate the fields for the {@link NodeHealthStatus}
+   * 
+   * @param healthStatus
+   */
+  public synchronized void setHealthStatus(NodeHealthStatus healthStatus) {
+    healthStatus.setNodeHealthy(this.isHealthy());
+    healthStatus.setHealthReport(this.getHealthReport());
+    healthStatus.setLastReported(this.getLastReportedTime());
+  }
+  
+  /**
+   * Test method to directly access the timer which node 
+   * health checker would use.
+   * 
+   *
+   * @return Timer task
+   */
+  //XXX:Not to be used directly.
+  TimerTask getTimer() {
+    return timer;
+  }
+}

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,110 @@
+package org.apache.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Static class which encapsulates the Node health
+ * related fields.
+ * 
+ */
+public class NodeHealthStatus implements Writable {
+  
+  private boolean isNodeHealthy;
+  
+  private String healthReport;
+  
+  private long lastReported;
+  
+  public NodeHealthStatus(boolean isNodeHealthy, String healthReport,
+      long lastReported) {
+    this.isNodeHealthy = isNodeHealthy;
+    this.healthReport = healthReport;
+    this.lastReported = lastReported;
+  }
+  
+  public NodeHealthStatus() {
+    this.isNodeHealthy = true;
+    this.healthReport = "";
+    this.lastReported = System.currentTimeMillis();
+  }
+
+  /**
+   * Sets whether or not a task tracker is healthy or not, based on the
+   * output from the node health script.
+   * 
+   * @param isNodeHealthy
+   */
+  public void setNodeHealthy(boolean isNodeHealthy) {
+    this.isNodeHealthy = isNodeHealthy;
+  }
+
+  /**
+   * Returns if node is healthy or not based on result from node health
+   * script.
+   * 
+   * @return true if the node is healthy.
+   */
+  public boolean isNodeHealthy() {
+    return isNodeHealthy;
+  }
+
+  /**
+   * Sets the health report based on the output from the health script.
+   * 
+   * @param healthReport
+   *          String listing cause of failure.
+   */
+  public void setHealthReport(String healthReport) {
+    this.healthReport = healthReport;
+  }
+
+  /**
+   * Returns the health report of the node if any, The health report is
+   * only populated when the node is not healthy.
+   * 
+   * @return health report of the node if any
+   */
+  public String getHealthReport() {
+    return healthReport;
+  }
+
+  /**
+   * Sets when the TT got its health information last 
+   * from node health monitoring service.
+   * 
+   * @param lastReported last reported time by node 
+   * health script
+   */
+  public void setLastReported(long lastReported) {
+    this.lastReported = lastReported;
+  }
+
+  /**
+   * Gets time of most recent node health update.
+   * 
+   * @return time stamp of most recent health update.
+   */
+  public long getLastReported() {
+    return lastReported;
+  }
+
+  @Override
+  public void readFields(DataInput in) throws IOException {
+    isNodeHealthy = in.readBoolean();
+    healthReport = Text.readString(in);
+    lastReported = in.readLong();
+  }
+  
+  @Override
+  public void write(DataOutput out) throws IOException {
+    out.writeBoolean(isNodeHealthy);
+    Text.writeString(out, healthReport);
+    out.writeLong(lastReported);
+  }
+  
+}
\ No newline at end of file

Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeHealthService {
+
+  private static volatile Log LOG = LogFactory
+      .getLog(TestNodeHealthService.class);
+
+  protected static File testRootDir = new File("target",
+      TestNodeHealthService.class.getName() + "-localDir").getAbsoluteFile();
+
+  final static File nodeHealthConfigFile = new File(testRootDir,
+      "modified-mapred-site.xml");
+
+  private File nodeHealthscriptFile = new File(testRootDir,
+      "failingscript.sh");
+
+  @Before
+  public void setup() {
+    testRootDir.mkdirs();
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (testRootDir.exists()) {
+      FileContext.getLocalFSFileContext().delete(
+          new Path(testRootDir.getAbsolutePath()), true);
+    }
+  }
+
+  private Configuration getConfForNodeHealthScript() {
+    Configuration conf = new Configuration();
+    conf.set(NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY,
+        nodeHealthscriptFile.getAbsolutePath());
+    conf.setLong(NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY, 500);
+    conf.setLong(
+        NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY, 1000);
+    return conf;
+  }
+
+  private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
+      throws IOException {
+    PrintWriter pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
+    pw.println(scriptStr);
+    pw.flush();
+    pw.close();
+    nodeHealthscriptFile.setExecutable(setExecutable);
+  }
+
+  @Test
+  public void testNodeHealthScriptShouldRun() throws IOException {
+    // Node health script should not start if there is no property called
+    // node health script path.
+    Assert.assertFalse("By default Health checker should not have started",
+        NodeHealthCheckerService.shouldRun(new Configuration()));
+    Configuration conf = getConfForNodeHealthScript();
+    // Node health script should not start if the node health script does not
+    // exists
+    Assert.assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    // Create script path.
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+    conf.addResource(nodeHealthConfigFile.getName());
+    writeNodeHealthScriptFile("", false);
+    // Node health script should not start if the node health script is not
+    // executable.
+    Assert.assertFalse("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+    writeNodeHealthScriptFile("", true);
+    Assert.assertTrue("Node health script should start", NodeHealthCheckerService
+        .shouldRun(conf));
+  }
+
+  @Test
+  public void testNodeHealthScript() throws Exception {
+    NodeHealthStatus healthStatus = new NodeHealthStatus();
+    String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
+    String normalScript = "echo \"I am all fine\"";
+    String timeOutScript = "sleep 4\n echo\"I am fine\"";
+    Configuration conf = getConfForNodeHealthScript();
+    conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+    conf.addResource(nodeHealthConfigFile.getName());
+
+    NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
+        conf);
+    TimerTask timer = nodeHealthChecker.getTimer();
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking initial healthy condition");
+    // Check proper report conditions.
+    Assert.assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    Assert.assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // write out error file.
+    // Healthy to unhealthy transition
+    writeNodeHealthScriptFile(errorScript, true);
+    // Run timer
+    timer.run();
+    // update health status
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->Unhealthy");
+    Assert.assertFalse("Node health status reported healthy", healthStatus
+        .isNodeHealthy());
+    Assert.assertFalse("Node health status reported healthy", healthStatus
+        .getHealthReport().isEmpty());
+    
+    // Check unhealthy to healthy transitions.
+    writeNodeHealthScriptFile(normalScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking UnHealthy--->healthy");
+    // Check proper report conditions.
+    Assert.assertTrue("Node health status reported unhealthy", healthStatus
+        .isNodeHealthy());
+    Assert.assertTrue("Node health status reported unhealthy", healthStatus
+        .getHealthReport().isEmpty());
+
+    // Healthy to timeout transition.
+    writeNodeHealthScriptFile(timeOutScript, true);
+    timer.run();
+    nodeHealthChecker.setHealthStatus(healthStatus);
+    LOG.info("Checking Healthy--->timeout");
+    Assert.assertFalse("Node health status reported healthy even after timeout",
+        healthStatus.isNodeHealthy());
+    Assert.assertEquals("Node time out message not propogated", healthStatus
+        .getHealthReport(),
+        NodeHealthCheckerService.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
+  }
+
+}

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Mon Mar 28 04:28:10 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
 
 import java.util.concurrent.ConcurrentMap;
 
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.ApplicationID;
@@ -34,4 +35,6 @@ public interface Context {
   public ConcurrentMap<ApplicationID, Application> getApplications();
 
   public ConcurrentMap<ContainerID, Container> getContainers();
+
+  public NodeHealthStatus getNodeHealthStatus();
 }

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Mar 28 04:28:10 2011
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentHa
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentSkipListMap;
 
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.security.SecurityUtil;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -60,11 +62,17 @@ public class NodeManager extends Composi
     // NodeManager level dispatcher
     Dispatcher dispatcher = new AsyncDispatcher();
 
+    NodeHealthCheckerService healthChecker = null;
+    if (NodeHealthCheckerService.shouldRun(conf)) {
+      healthChecker = new NodeHealthCheckerService();
+      addService(healthChecker);
+    }
+
     // StatusUpdater should be added first so that it can start first. Once it
     // contacts RM, does registration and gets tokens, then only
     // ContainerManager can start.
     NodeStatusUpdater nodeStatusUpdater =
-        createNodeStatusUpdater(context, dispatcher);
+        createNodeStatusUpdater(context, dispatcher, healthChecker);
     addService(nodeStatusUpdater);
 
     NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@@ -81,8 +89,8 @@ public class NodeManager extends Composi
   }
 
   protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-      Dispatcher dispatcher) {
-    return new NodeStatusUpdaterImpl(context, dispatcher);
+      Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+    return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
   }
 
   protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -146,6 +154,8 @@ public class NodeManager extends Composi
             }
           });
 
+    private final NodeHealthStatus nodeHealthStatus = new NodeHealthStatus();
+
     public NMContext() {
     }
 
@@ -159,6 +169,10 @@ public class NodeManager extends Composi
       return this.containers;
     }
 
+    @Override
+    public NodeHealthStatus getNodeHealthStatus() {
+      return this.nodeHealthStatus;
+    }
   }
 
   public static void main(String[] args) {

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Mar 28 04:28:10 2011
@@ -30,6 +30,8 @@ import org.apache.avro.AvroRuntimeExcept
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.CommonConfigurationKeys;
 import org.apache.hadoop.net.NetUtils;
@@ -70,8 +72,12 @@ public class NodeStatusUpdaterImpl exten
   private byte[] secretKeyBytes = new byte[0];
   private boolean isStopped;
 
-  public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher) {
+  private final NodeHealthCheckerService healthChecker;
+
+  public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+      NodeHealthCheckerService healthChecker) {
     super(NodeStatusUpdaterImpl.class.getName());
+    this.healthChecker = healthChecker;
     this.context = context;
     this.dispatcher = dispatcher;
   }
@@ -196,6 +202,14 @@ public class NodeStatusUpdaterImpl exten
     LOG.debug(this.nodeName + " sending out status for " + numActiveContainers
         + " containers");
 
+    if (this.healthChecker != null) {
+      NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+      this.healthChecker.setHealthStatus(nodeHealthStatus);
+      status.isNodeHealthy = nodeHealthStatus.isNodeHealthy();
+      status.healthReport = nodeHealthStatus.getHealthReport();
+      status.lastHealthReport = nodeHealthStatus.getLastReported();
+    }
+
     return status;
   }
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Mon Mar 28 04:28:10 2011
@@ -24,6 +24,7 @@ import java.util.HashMap;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.yarn.ApplicationID;
 import org.apache.hadoop.yarn.ContainerID;
@@ -49,8 +50,9 @@ public class TestEventFlow {
     ContainerExecutor exec = new DefaultContainerExecutor();
     DeletionService del = new DeletionService(exec);
     Dispatcher dispatcher = new AsyncDispatcher();
+    NodeHealthCheckerService healthChecker = null;
     NodeStatusUpdater nodeStatusUpdater =
-        new NodeStatusUpdaterImpl(context, dispatcher) {
+        new NodeStatusUpdaterImpl(context, dispatcher, healthChecker) {
       @Override
       protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
         return new LocalRMInterface();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Mar 28 04:28:10 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMa
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.ApplicationID;
@@ -159,8 +160,9 @@ public class TestNodeStatusUpdater {
   private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
     private Context context;
 
-    public MyNodeStatusUpdater(Context context, Dispatcher dispatcher) {
-      super(context, dispatcher);
+    public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
+        NodeHealthCheckerService healthChecker) {
+      super(context, dispatcher, healthChecker);
       this.context = context;
     }
 
@@ -186,8 +188,8 @@ public class TestNodeStatusUpdater {
     final NodeManager nm = new NodeManager() {
       @Override
       protected NodeStatusUpdater createNodeStatusUpdater(Context context,
-          Dispatcher dispatcher) {
-        return new MyNodeStatusUpdater(context, dispatcher);
+          Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+        return new MyNodeStatusUpdater(context, dispatcher, healthChecker);
       }
     };
 

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Mar 28 04:28:10 2011
@@ -33,6 +33,7 @@ import junit.framework.Assert;
 import org.apache.avro.ipc.AvroRemoteException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
@@ -98,11 +99,11 @@ public class TestContainerManager {
   private ContainerExecutor exec = new DefaultContainerExecutor();
   private DeletionService delSrvc;
   private Dispatcher dispatcher = new AsyncDispatcher();
-
+  private NodeHealthCheckerService healthChecker = null;
   private String user = "nobody";
 
   private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
-      context, dispatcher) {
+      context, dispatcher, healthChecker) {
     @Override
     protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
       return new LocalRMInterface();

Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Mon Mar 28 04:28:10 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.yarn.YarnException;
@@ -133,8 +134,10 @@ public class MiniYARNCluster extends Com
               org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater
               createNodeStatusUpdater(
                   org.apache.hadoop.yarn.server.nodemanager.Context context,
-                  Dispatcher dispatcher) {
-            return new NodeStatusUpdaterImpl(context, dispatcher) {
+                  Dispatcher dispatcher,
+                  NodeHealthCheckerService healthChecker) {
+            return new NodeStatusUpdaterImpl(context, dispatcher,
+                healthChecker) {
               @Override
               protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
                 // For in-process communication without RPC