You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2011/03/28 06:28:11 UTC
svn commit: r1086116 - in /hadoop/mapreduce/branches/MR-279: ./
mr-client/hadoop-mapreduce-client-jobclient/
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/server/tasktracker/
src/java/org/apache/hadoop/mapreduce/util/ src/test...
Author: vinodkv
Date: Mon Mar 28 04:28:10 2011
New Revision: 1086116
URL: http://svn.apache.org/viewvc?rev=1086116&view=rev
Log:
Implementing health-checks for the NodeManager. Moving NodeHealthCheckerService from mapred into yarn-server-common. Contributed by Vinod Kumar Vavilapalli.
Added:
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
Removed:
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/NodeHealthCheckerService.java
hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestNodeHealthService.java
Modified:
hadoop/mapreduce/branches/MR-279/ivy.xml
hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java
hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml
hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
Modified: hadoop/mapreduce/branches/MR-279/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/ivy.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/ivy.xml (original)
+++ hadoop/mapreduce/branches/MR-279/ivy.xml Mon Mar 28 04:28:10 2011
@@ -74,6 +74,8 @@
<dependency org="org.apache.hadoop" name="hadoop-mapreduce-client-core"
rev="${hadoop-mapreduce-client-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="yarn-server-common"
+ rev="${yarn.version}" conf="common->default"/>
<dependency org="jdiff" name="jdiff" rev="${jdiff.version}"
conf="jdiff->default"/>
Modified: hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/mr-client/hadoop-mapreduce-client-jobclient/pom.xml Mon Mar 28 04:28:10 2011
@@ -79,17 +79,4 @@
<scope>test</scope>
</dependency>
</dependencies>
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <!--exclude>**/TestYARNClient.java</exclude-->
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
</project>
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Mar 28 04:28:10 2011
@@ -55,6 +55,7 @@ import javax.security.auth.login.LoginEx
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -76,7 +77,6 @@ import org.apache.hadoop.mapred.ClusterS
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
import org.apache.hadoop.mapred.JobTrackerStatistics.TaskTrackerStat;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.ClusterMetrics;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.QueueInfo;
@@ -2695,7 +2695,7 @@ public class JobTracker implements MRCon
}
private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
- TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
+ NodeHealthStatus status = trackerStatus.getHealthStatus();
synchronized (faultyTrackers) {
faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(),
status.isNodeHealthy(), status.getHealthReport());
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Mar 28 04:28:10 2011
@@ -53,6 +53,8 @@ import javax.servlet.http.HttpServletRes
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
@@ -73,7 +75,6 @@ import org.apache.hadoop.ipc.RemoteExcep
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.CleanupQueue.PathDeletionContext;
import org.apache.hadoop.mapred.TaskController.DeletionContext;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.MRJobConfig;
@@ -1724,7 +1725,7 @@ public class TaskTracker
}
//add node health information
- TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+ NodeHealthStatus healthStatus = status.getHealthStatus();
synchronized (this) {
if (healthChecker != null) {
healthChecker.setHealthStatus(healthStatus);
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Mar 28 04:28:10 2011
@@ -19,6 +19,7 @@ package org.apache.hadoop.mapred;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.io.*;
@@ -54,7 +55,7 @@ public class TaskTrackerStatus implement
volatile long lastSeen;
private int maxMapTasks;
private int maxReduceTasks;
- private TaskTrackerHealthStatus healthStatus;
+ private NodeHealthStatus healthStatus;
public static final int UNAVAILABLE = -1;
/**
@@ -345,7 +346,7 @@ public class TaskTrackerStatus implement
public TaskTrackerStatus() {
taskReports = new ArrayList<TaskStatus>();
resStatus = new ResourceStatus();
- this.healthStatus = new TaskTrackerHealthStatus();
+ this.healthStatus = new NodeHealthStatus();
}
public TaskTrackerStatus(String trackerName, String host) {
@@ -369,7 +370,7 @@ public class TaskTrackerStatus implement
this.maxMapTasks = maxMapTasks;
this.maxReduceTasks = maxReduceTasks;
this.resStatus = new ResourceStatus();
- this.healthStatus = new TaskTrackerHealthStatus();
+ this.healthStatus = new NodeHealthStatus();
}
/**
@@ -539,117 +540,10 @@ public class TaskTrackerStatus implement
* Returns health status of the task tracker.
* @return health status of Task Tracker
*/
- public TaskTrackerHealthStatus getHealthStatus() {
+ public NodeHealthStatus getHealthStatus() {
return healthStatus;
}
- /**
- * Static class which encapsulates the Node health
- * related fields.
- *
- */
- /**
- * Static class which encapsulates the Node health
- * related fields.
- *
- */
- static class TaskTrackerHealthStatus implements Writable {
-
- private boolean isNodeHealthy;
-
- private String healthReport;
-
- private long lastReported;
-
- public TaskTrackerHealthStatus(boolean isNodeHealthy, String healthReport,
- long lastReported) {
- this.isNodeHealthy = isNodeHealthy;
- this.healthReport = healthReport;
- this.lastReported = lastReported;
- }
-
- public TaskTrackerHealthStatus() {
- this.isNodeHealthy = true;
- this.healthReport = "";
- this.lastReported = System.currentTimeMillis();
- }
-
- /**
- * Sets whether or not a task tracker is healthy or not, based on the
- * output from the node health script.
- *
- * @param isNodeHealthy
- */
- void setNodeHealthy(boolean isNodeHealthy) {
- this.isNodeHealthy = isNodeHealthy;
- }
-
- /**
- * Returns if node is healthy or not based on result from node health
- * script.
- *
- * @return true if the node is healthy.
- */
- boolean isNodeHealthy() {
- return isNodeHealthy;
- }
-
- /**
- * Sets the health report based on the output from the health script.
- *
- * @param healthReport
- * String listing cause of failure.
- */
- void setHealthReport(String healthReport) {
- this.healthReport = healthReport;
- }
-
- /**
- * Returns the health report of the node if any, The health report is
- * only populated when the node is not healthy.
- *
- * @return health report of the node if any
- */
- String getHealthReport() {
- return healthReport;
- }
-
- /**
- * Sets when the TT got its health information last
- * from node health monitoring service.
- *
- * @param lastReported last reported time by node
- * health script
- */
- public void setLastReported(long lastReported) {
- this.lastReported = lastReported;
- }
-
- /**
- * Gets time of most recent node health update.
- *
- * @return time stamp of most recent health update.
- */
- public long getLastReported() {
- return lastReported;
- }
-
- @Override
- public void readFields(DataInput in) throws IOException {
- isNodeHealthy = in.readBoolean();
- healthReport = Text.readString(in);
- lastReported = in.readLong();
- }
-
- @Override
- public void write(DataOutput out) throws IOException {
- out.writeBoolean(isNodeHealthy);
- Text.writeString(out, healthReport);
- out.writeLong(lastReported);
- }
-
- }
-
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/server/tasktracker/TTConfig.java Mon Mar 28 04:28:10 2011
@@ -30,15 +30,6 @@ import org.apache.hadoop.mapreduce.MRCon
@InterfaceStability.Evolving
public interface TTConfig extends MRConfig {
- // Task-tracker configuration properties
- public static final String TT_HEALTH_CHECKER_INTERVAL =
- "mapreduce.tasktracker.healthchecker.interval";
- public static final String TT_HEALTH_CHECKER_SCRIPT_ARGS =
- "mapreduce.tasktracker.healthchecker.script.args";
- public static final String TT_HEALTH_CHECKER_SCRIPT_PATH =
- "mapreduce.tasktracker.healthchecker.script.path";
- public static final String TT_HEALTH_CHECKER_SCRIPT_TIMEOUT =
- "mapreduce.tasktracker.healthchecker.script.timeout";
public static final String TT_LOCAL_DIR_MINSPACE_KILL =
"mapreduce.tasktracker.local.dir.minspacekill";
public static final String TT_LOCAL_DIR_MINSPACE_START =
Modified: hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/java/org/apache/hadoop/mapreduce/util/ServerConfigUtil.java Mon Mar 28 04:28:10 2011
@@ -8,6 +8,7 @@ import org.apache.hadoop.mapreduce.MRCon
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.NodeHealthCheckerService;
/**
* Place holder for deprecated keys in the framework
@@ -105,13 +106,24 @@ public class ServerConfigUtil {
Configuration.addDeprecation("mapred.job.tracker.retire.jobs",
new String[] {JTConfig.JT_RETIREJOBS});
Configuration.addDeprecation("mapred.healthChecker.interval",
- new String[] {TTConfig.TT_HEALTH_CHECKER_INTERVAL});
- Configuration.addDeprecation("mapred.healthChecker.script.args",
- new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_ARGS});
- Configuration.addDeprecation("mapred.healthChecker.script.path",
- new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_PATH});
- Configuration.addDeprecation("mapred.healthChecker.script.timeout",
- new String[] {TTConfig.TT_HEALTH_CHECKER_SCRIPT_TIMEOUT});
+ new String[] {NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY});
+ Configuration
+ .addDeprecation(
+ "mapred.healthChecker.script.args",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY
+ });
+ Configuration
+ .addDeprecation(
+ "mapred.healthChecker.script.path",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY });
+ Configuration
+ .addDeprecation(
+ "mapred.healthChecker.script.timeout",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY
+ });
Configuration.addDeprecation("mapred.local.dir.minspacekill",
new String[] {TTConfig.TT_LOCAL_DIR_MINSPACE_KILL});
Configuration.addDeprecation("mapred.local.dir.minspacestart",
@@ -148,5 +160,27 @@ public class ServerConfigUtil {
new String[] {TTConfig.TT_LOCAL_CACHE_SIZE});
Configuration.addDeprecation("tasktracker.contention.tracking",
new String[] {TTConfig.TT_CONTENTION_TRACKING});
+ Configuration
+ .addDeprecation(
+ "mapreduce.tasktracker.healthchecker.interval",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY });
+ Configuration
+ .addDeprecation(
+ "mapreduce.tasktracker.healthchecker.script.args",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY
+ });
+ Configuration
+ .addDeprecation(
+ "mapreduce.tasktracker.healthchecker.script.path",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY });
+ Configuration
+ .addDeprecation(
+ "mapreduce.tasktracker.healthchecker.script.timeout",
+ new String[] {
+ NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY
+ });
}
}
Modified: hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java (original)
+++ hadoop/mapreduce/branches/MR-279/src/test/mapred/org/apache/hadoop/mapred/TestTaskTrackerBlacklisting.java Mon Mar 28 04:28:10 2011
@@ -33,9 +33,9 @@ import junit.framework.Test;
import junit.framework.TestCase;
import junit.framework.TestSuite;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.mapred.ClusterStatus.BlackListInfo;
import org.apache.hadoop.mapred.JobTracker.ReasonForBlackListing;
-import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
@@ -159,14 +159,14 @@ public class TestTaskTrackerBlacklisting
return setup;
}
- private static void sendHeartBeat(TaskTrackerHealthStatus status,
+ private static void sendHeartBeat(NodeHealthStatus status,
boolean initialContact)
throws IOException {
for (String tracker : trackers) {
TaskTrackerStatus tts = new TaskTrackerStatus(tracker, HostUtil
.convertTrackerNameToHostName(tracker));
if (status != null) {
- TaskTrackerHealthStatus healthStatus = tts.getHealthStatus();
+ NodeHealthStatus healthStatus = tts.getHealthStatus();
healthStatus.setNodeHealthy(status.isNodeHealthy());
healthStatus.setHealthReport(status.getHealthReport());
healthStatus.setLastReported(status.getLastReported());
@@ -193,7 +193,7 @@ public class TestTaskTrackerBlacklisting
}
public void testNodeHealthBlackListing() throws Exception {
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
//Blacklist tracker due to node health failures.
sendHeartBeat(status, false);
for (String host : hosts) {
@@ -227,7 +227,7 @@ public class TestTaskTrackerBlacklisting
failureCount, getFailureCountSinceStart(jobTracker, tracker));
}
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
//When the node fails due to health check, the statistics is
//incremented.
@@ -266,7 +266,7 @@ public class TestTaskTrackerBlacklisting
assertEquals("Tracker 1 not blacklisted", 1,
jobTracker.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
@@ -297,7 +297,7 @@ public class TestTaskTrackerBlacklisting
public void testBlacklistingReasonString() throws Exception {
String error = "ERROR";
String error1 = "ERROR1";
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus(error);
+ NodeHealthStatus status = getUnhealthyNodeStatus(error);
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
@@ -331,8 +331,8 @@ public class TestTaskTrackerBlacklisting
sendHeartBeat(null, false);
}
- private TaskTrackerHealthStatus getUnhealthyNodeStatus(String error) {
- TaskTrackerHealthStatus status = new TaskTrackerHealthStatus();
+ private NodeHealthStatus getUnhealthyNodeStatus(String error) {
+ NodeHealthStatus status = new NodeHealthStatus();
status.setNodeHealthy(false);
status.setLastReported(System.currentTimeMillis());
status.setHealthReport(error);
@@ -363,7 +363,7 @@ public class TestTaskTrackerBlacklisting
.getBlacklistedTrackerCount());
checkReasonForBlackListing(hosts[0], exceedsFailuresReasonSet);
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus("ERROR");
+ NodeHealthStatus status = getUnhealthyNodeStatus("ERROR");
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3,
jobTracker.getBlacklistedTrackerCount());
@@ -398,7 +398,7 @@ public class TestTaskTrackerBlacklisting
assertTrue("The blacklisted tracker nodes is not empty.",
blackListedTrackerInfo.isEmpty());
- TaskTrackerHealthStatus status = getUnhealthyNodeStatus(errorWithNewLines);
+ NodeHealthStatus status = getUnhealthyNodeStatus(errorWithNewLines);
// make all tracker unhealthy
sendHeartBeat(status, false);
assertEquals("All trackers not blacklisted", 3, jobTracker
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-api/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,10 @@
<?xml version="1.0"?><project>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
+
<parent>
<artifactId>yarn</artifactId>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-common/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,9 @@
<?xml version="1.0"?><project>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
<parent>
<artifactId>yarn</artifactId>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/pom.xml Mon Mar 28 04:28:10 2011
@@ -1,4 +1,9 @@
<?xml version="1.0"?><project>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
<parent>
<artifactId>yarn</artifactId>
<groupId>org.apache.hadoop</groupId>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/pom.xml Mon Mar 28 04:28:10 2011
@@ -8,6 +8,11 @@
<groupId>org.apache.hadoop</groupId>
<artifactId>yarn-server-common</artifactId>
<name>yarn-server-common</name>
+
+ <properties>
+ <yarn.version>1.0-SNAPSHOT</yarn.version>
+ </properties>
+
<version>${yarn.version}</version>
<url>http://maven.apache.org</url>
@@ -18,7 +23,7 @@
<version>${yarn.version}</version>
</dependency>
<dependency>
- <groupId>org.apache.hadoop</groupId>
+ <groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.3.1</version>
<scope>compile</scope>
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/avro/ResourceTracker.genavro Mon Mar 28 04:28:10 2011
@@ -13,6 +13,9 @@ protocol ResourceTracker {
int responseId;
long lastSeen;
map<array<org.apache.hadoop.yarn.Container>> containers;
+ boolean isNodeHealthy;
+ union {string, null} healthReport;
+ long lastHealthReport;
}
record RegistrationResponse {
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthCheckerService.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,383 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.service.AbstractService;
+
+/**
+ *
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+public class NodeHealthCheckerService extends AbstractService {
+
+ private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
+
+ /** Absolute path to the health script. */
+ private String nodeHealthScript;
+ /** Delay after which node health script to be executed */
+ private long intervalTime;
+ /** Time after which the script should be timedout */
+ private long scriptTimeout;
+ /** Timer used to schedule node health monitoring script execution */
+ private Timer nodeHealthScriptScheduler;
+
+ /** ShellCommandExecutor used to execute monitoring script */
+ ShellCommandExecutor shexec = null;
+
+ /** Configuration used by the checker */
+ private Configuration conf;
+
+ /** Pattern used for searching in the output of the node health script */
+ static private final String ERROR_PATTERN = "ERROR";
+
+ /* Configuration keys */
+ public static final String HEALTH_CHECK_SCRIPT_PROPERTY =
+ "yarn.server.nodemanager.healthchecker.script.path";
+
+ public static final String HEALTH_CHECK_INTERVAL_PROPERTY =
+ "yarn.server.nodemanager.healthchecker.interval";
+
+ public static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY =
+ "yarn.server.nodemanager.healthchecker.script.timeout";
+
+ public static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY =
+ "yarn.server.nodemanager.healthchecker.script.args";
+
+ /* end of configuration keys */
+ /** Time out error message */
+ static final String NODE_HEALTH_SCRIPT_TIMED_OUT_MSG = "Node health script timed out";
+
+ /** Default frequency of running node health script */
+ private static final long DEFAULT_HEALTH_CHECK_INTERVAL = 10 * 60 * 1000;
+ /** Default script time out period */
+ private static final long DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL = 2 * DEFAULT_HEALTH_CHECK_INTERVAL;
+
+ private boolean isHealthy;
+
+ private String healthReport;
+
+ private long lastReportedTime;
+
+ private TimerTask timer;
+
+
+ private enum HealthCheckerExitStatus {
+ SUCCESS,
+ TIMED_OUT,
+ FAILED_WITH_EXIT_CODE,
+ FAILED_WITH_EXCEPTION,
+ FAILED
+ }
+
+
+ /**
+ * Class which is used by the {@link Timer} class to periodically execute the
+ * node health script.
+ *
+ */
+ private class NodeHealthMonitorExecutor extends TimerTask {
+
+ String exceptionStackTrace = "";
+
+ public NodeHealthMonitorExecutor(String[] args) {
+ ArrayList<String> execScript = new ArrayList<String>();
+ execScript.add(nodeHealthScript);
+ if (args != null) {
+ execScript.addAll(Arrays.asList(args));
+ }
+ shexec = new ShellCommandExecutor(execScript
+ .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+ }
+
+ @Override
+ public void run() {
+ HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+ try {
+ shexec.execute();
+ } catch (ExitCodeException e) {
+ // ignore the exit code of the script
+ status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+ } catch (Exception e) {
+ LOG.warn("Caught exception : " + e.getMessage());
+ if (!shexec.isTimedOut()) {
+ status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+ } else {
+ status = HealthCheckerExitStatus.TIMED_OUT;
+ }
+ exceptionStackTrace = StringUtils.stringifyException(e);
+ } finally {
+ if (status == HealthCheckerExitStatus.SUCCESS) {
+ if (hasErrors(shexec.getOutput())) {
+ status = HealthCheckerExitStatus.FAILED;
+ }
+ }
+ reportHealthStatus(status);
+ }
+ }
+
+ /**
+ * Method which is used to parse output from the node health monitor and
+ * send to the report address.
+ *
+ * The timed out script or script which causes IOException output is
+ * ignored.
+ *
+ * The node is marked unhealthy if
+ * <ol>
+ * <li>The node health script times out</li>
+ * <li>The node health scripts output has a line which begins with ERROR</li>
+ * <li>An exception is thrown while executing the script</li>
+ * </ol>
+ * If the script throws {@link IOException} or {@link ExitCodeException} the
+ * output is ignored and node is left remaining healthy, as script might
+ * have syntax error.
+ *
+ * @param status
+ */
+ void reportHealthStatus(HealthCheckerExitStatus status) {
+ long now = System.currentTimeMillis();
+ switch (status) {
+ case SUCCESS:
+ setHealthStatus(true, "", now);
+ break;
+ case TIMED_OUT:
+ setHealthStatus(false, NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
+ break;
+ case FAILED_WITH_EXCEPTION:
+ setHealthStatus(false, exceptionStackTrace);
+ break;
+ case FAILED_WITH_EXIT_CODE:
+ setHealthStatus(true, "", now);
+ break;
+ case FAILED:
+ setHealthStatus(false, shexec.getOutput());
+ break;
+ }
+ }
+
+ /**
+ * Method to check if the output string has line which begins with ERROR.
+ *
+ * @param output
+ * string
+ * @return true if output string has error pattern in it.
+ */
+ private boolean hasErrors(String output) {
+ String[] splits = output.split("\n");
+ for (String split : splits) {
+ if (split.startsWith(ERROR_PATTERN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public NodeHealthCheckerService() {
+ super(NodeHealthCheckerService.class.getName());
+ this.lastReportedTime = System.currentTimeMillis();
+ this.isHealthy = true;
+ this.healthReport = "";
+ }
+
+ public NodeHealthCheckerService(Configuration conf) {
+ this();
+ this.conf = conf;
+ init(conf);
+ }
+
+ /*
+ * Method which initializes the values for the script path and interval time.
+ */
+ @Override
+ public void init(Configuration conf) {
+ this.nodeHealthScript =
+ conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+ this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+ DEFAULT_HEALTH_CHECK_INTERVAL);
+ this.scriptTimeout = conf.getLong(
+ HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+ DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
+ String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+ new String[] {});
+ timer = new NodeHealthMonitorExecutor(args);
+ }
+
+ /**
+ * Method used to start the Node health monitoring.
+ *
+ */
+ @Override
+ public void start() {
+ // if health script path is not configured don't start the thread.
+ if (!shouldRun(conf)) {
+ LOG.info("Not starting node health monitor");
+ return;
+ }
+ nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+ // Start the timer task immediately and
+ // then periodically at interval time.
+ nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+ }
+
+ /**
+ * Method used to terminate the node health monitoring service.
+ *
+ */
+ @Override
+ public void stop() {
+ if (!shouldRun(conf)) {
+ return;
+ }
+ nodeHealthScriptScheduler.cancel();
+ if (shexec != null) {
+ Process p = shexec.getProcess();
+ if (p != null) {
+ p.destroy();
+ }
+ }
+ }
+
+ /**
+ * Gets the if the node is healthy or not
+ *
+ * @return true if node is healthy
+ */
+ private boolean isHealthy() {
+ return isHealthy;
+ }
+
+ /**
+ * Sets if the node is healhty or not.
+ *
+ * @param isHealthy
+ * if or not node is healthy
+ */
+ private synchronized void setHealthy(boolean isHealthy) {
+ this.isHealthy = isHealthy;
+ }
+
+ /**
+ * Returns output from health script. if node is healthy then an empty string
+ * is returned.
+ *
+ * @return output from health script
+ */
+ private String getHealthReport() {
+ return healthReport;
+ }
+
+ /**
+ * Sets the health report from the node health script.
+ *
+ * @param healthReport
+ */
+ private synchronized void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+
+ /**
+ * Returns time stamp when node health script was last run.
+ *
+ * @return timestamp when node health script was last run
+ */
+ private long getLastReportedTime() {
+ return lastReportedTime;
+ }
+
+ /**
+ * Sets the last run time of the node health script.
+ *
+ * @param lastReportedTime
+ */
+ private synchronized void setLastReportedTime(long lastReportedTime) {
+ this.lastReportedTime = lastReportedTime;
+ }
+
+ /**
+ * Method used to determine if or not node health monitoring service should be
+ * started or not. Returns true if following conditions are met:
+ *
+ * <ol>
+ * <li>Path to Node health check script is not empty</li>
+ * <li>Node health check script file exists</li>
+ * </ol>
+ *
+ * @param conf
+ * @return true if node health monitoring service can be started.
+ */
+ public static boolean shouldRun(Configuration conf) {
+ String nodeHealthScript =
+ conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+ if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+ return false;
+ }
+ File f = new File(nodeHealthScript);
+ return f.exists() && f.canExecute();
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output) {
+ this.setHealthy(isHealthy);
+ this.setHealthReport(output);
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output,
+ long time) {
+ this.setHealthStatus(isHealthy, output);
+ this.setLastReportedTime(time);
+ }
+
+ /**
+ * Method to populate the fields for the {@link NodeHealthStatus}
+ *
+ * @param healthStatus
+ */
+ public synchronized void setHealthStatus(NodeHealthStatus healthStatus) {
+ healthStatus.setNodeHealthy(this.isHealthy());
+ healthStatus.setHealthReport(this.getHealthReport());
+ healthStatus.setLastReported(this.getLastReportedTime());
+ }
+
+ /**
+ * Test method to directly access the timer which node
+ * health checker would use.
+ *
+ *
+ * @return Timer task
+ */
+ //XXX:Not to be used directly.
+ TimerTask getTimer() {
+ return timer;
+ }
+}
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/main/java/org/apache/hadoop/NodeHealthStatus.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,110 @@
+package org.apache.hadoop;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+
+/**
+ * Static class which encapsulates the Node health
+ * related fields.
+ *
+ */
+public class NodeHealthStatus implements Writable {
+
+ private boolean isNodeHealthy;
+
+ private String healthReport;
+
+ private long lastReported;
+
+ public NodeHealthStatus(boolean isNodeHealthy, String healthReport,
+ long lastReported) {
+ this.isNodeHealthy = isNodeHealthy;
+ this.healthReport = healthReport;
+ this.lastReported = lastReported;
+ }
+
+ public NodeHealthStatus() {
+ this.isNodeHealthy = true;
+ this.healthReport = "";
+ this.lastReported = System.currentTimeMillis();
+ }
+
+ /**
+ * Sets whether or not a task tracker is healthy or not, based on the
+ * output from the node health script.
+ *
+ * @param isNodeHealthy
+ */
+ public void setNodeHealthy(boolean isNodeHealthy) {
+ this.isNodeHealthy = isNodeHealthy;
+ }
+
+ /**
+ * Returns if node is healthy or not based on result from node health
+ * script.
+ *
+ * @return true if the node is healthy.
+ */
+ public boolean isNodeHealthy() {
+ return isNodeHealthy;
+ }
+
+ /**
+ * Sets the health report based on the output from the health script.
+ *
+ * @param healthReport
+ * String listing cause of failure.
+ */
+ public void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+
+ /**
+ * Returns the health report of the node if any, The health report is
+ * only populated when the node is not healthy.
+ *
+ * @return health report of the node if any
+ */
+ public String getHealthReport() {
+ return healthReport;
+ }
+
+ /**
+ * Sets when the TT got its health information last
+ * from node health monitoring service.
+ *
+ * @param lastReported last reported time by node
+ * health script
+ */
+ public void setLastReported(long lastReported) {
+ this.lastReported = lastReported;
+ }
+
+ /**
+ * Gets time of most recent node health update.
+ *
+ * @return time stamp of most recent health update.
+ */
+ public long getLastReported() {
+ return lastReported;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ isNodeHealthy = in.readBoolean();
+ healthReport = Text.readString(in);
+ lastReported = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(isNodeHealthy);
+ Text.writeString(out, healthReport);
+ out.writeLong(lastReported);
+ }
+
+}
\ No newline at end of file
Added: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java?rev=1086116&view=auto
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java (added)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-common/src/test/java/org/apache/hadoop/TestNodeHealthService.java Mon Mar 28 04:28:10 2011
@@ -0,0 +1,167 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.Path;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestNodeHealthService {
+
+ private static volatile Log LOG = LogFactory
+ .getLog(TestNodeHealthService.class);
+
+ protected static File testRootDir = new File("target",
+ TestNodeHealthService.class.getName() + "-localDir").getAbsoluteFile();
+
+ final static File nodeHealthConfigFile = new File(testRootDir,
+ "modified-mapred-site.xml");
+
+ private File nodeHealthscriptFile = new File(testRootDir,
+ "failingscript.sh");
+
+ @Before
+ public void setup() {
+ testRootDir.mkdirs();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (testRootDir.exists()) {
+ FileContext.getLocalFSFileContext().delete(
+ new Path(testRootDir.getAbsolutePath()), true);
+ }
+ }
+
+ private Configuration getConfForNodeHealthScript() {
+ Configuration conf = new Configuration();
+ conf.set(NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY,
+ nodeHealthscriptFile.getAbsolutePath());
+ conf.setLong(NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY, 500);
+ conf.setLong(
+ NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY, 1000);
+ return conf;
+ }
+
+ private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
+ throws IOException {
+ PrintWriter pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
+ pw.println(scriptStr);
+ pw.flush();
+ pw.close();
+ nodeHealthscriptFile.setExecutable(setExecutable);
+ }
+
+ @Test
+ public void testNodeHealthScriptShouldRun() throws IOException {
+ // Node health script should not start if there is no property called
+ // node health script path.
+ Assert.assertFalse("By default Health checker should not have started",
+ NodeHealthCheckerService.shouldRun(new Configuration()));
+ Configuration conf = getConfForNodeHealthScript();
+ // Node health script should not start if the node health script does not
+ // exists
+ Assert.assertFalse("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ // Create script path.
+ conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+ conf.addResource(nodeHealthConfigFile.getName());
+ writeNodeHealthScriptFile("", false);
+ // Node health script should not start if the node health script is not
+ // executable.
+ Assert.assertFalse("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ writeNodeHealthScriptFile("", true);
+ Assert.assertTrue("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ }
+
+ @Test
+ public void testNodeHealthScript() throws Exception {
+ NodeHealthStatus healthStatus = new NodeHealthStatus();
+ String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
+ String normalScript = "echo \"I am all fine\"";
+ String timeOutScript = "sleep 4\n echo\"I am fine\"";
+ Configuration conf = getConfForNodeHealthScript();
+ conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+ conf.addResource(nodeHealthConfigFile.getName());
+
+ NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
+ conf);
+ TimerTask timer = nodeHealthChecker.getTimer();
+ writeNodeHealthScriptFile(normalScript, true);
+ timer.run();
+
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking initial healthy condition");
+ // Check proper report conditions.
+ Assert.assertTrue("Node health status reported unhealthy", healthStatus
+ .isNodeHealthy());
+ Assert.assertTrue("Node health status reported unhealthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // write out error file.
+ // Healthy to unhealthy transition
+ writeNodeHealthScriptFile(errorScript, true);
+ // Run timer
+ timer.run();
+ // update health status
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking Healthy--->Unhealthy");
+ Assert.assertFalse("Node health status reported healthy", healthStatus
+ .isNodeHealthy());
+ Assert.assertFalse("Node health status reported healthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // Check unhealthy to healthy transitions.
+ writeNodeHealthScriptFile(normalScript, true);
+ timer.run();
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking UnHealthy--->healthy");
+ // Check proper report conditions.
+ Assert.assertTrue("Node health status reported unhealthy", healthStatus
+ .isNodeHealthy());
+ Assert.assertTrue("Node health status reported unhealthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // Healthy to timeout transition.
+ writeNodeHealthScriptFile(timeOutScript, true);
+ timer.run();
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking Healthy--->timeout");
+ Assert.assertFalse("Node health status reported healthy even after timeout",
+ healthStatus.isNodeHealthy());
+ Assert.assertEquals("Node time out message not propogated", healthStatus
+ .getHealthReport(),
+ NodeHealthCheckerService.NODE_HEALTH_SCRIPT_TIMED_OUT_MSG);
+ }
+
+}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java Mon Mar 28 04:28:10 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.yarn.server.no
import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
import org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
import org.apache.hadoop.yarn.ApplicationID;
@@ -34,4 +35,6 @@ public interface Context {
public ConcurrentMap<ApplicationID, Application> getApplications();
public ConcurrentMap<ContainerID, Container> getContainers();
+
+ public NodeHealthStatus getNodeHealthStatus();
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java Mon Mar 28 04:28:10 2011
@@ -27,6 +27,8 @@ import java.util.concurrent.ConcurrentHa
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ReflectionUtils;
@@ -60,11 +62,17 @@ public class NodeManager extends Composi
// NodeManager level dispatcher
Dispatcher dispatcher = new AsyncDispatcher();
+ NodeHealthCheckerService healthChecker = null;
+ if (NodeHealthCheckerService.shouldRun(conf)) {
+ healthChecker = new NodeHealthCheckerService();
+ addService(healthChecker);
+ }
+
// StatusUpdater should be added first so that it can start first. Once it
// contacts RM, does registration and gets tokens, then only
// ContainerManager can start.
NodeStatusUpdater nodeStatusUpdater =
- createNodeStatusUpdater(context, dispatcher);
+ createNodeStatusUpdater(context, dispatcher, healthChecker);
addService(nodeStatusUpdater);
NodeResourceMonitor nodeResourceMonitor = createNodeResourceMonitor();
@@ -81,8 +89,8 @@ public class NodeManager extends Composi
}
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher) {
- return new NodeStatusUpdaterImpl(context, dispatcher);
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new NodeStatusUpdaterImpl(context, dispatcher, healthChecker);
}
protected NodeResourceMonitor createNodeResourceMonitor() {
@@ -146,6 +154,8 @@ public class NodeManager extends Composi
}
});
+ private final NodeHealthStatus nodeHealthStatus = new NodeHealthStatus();
+
public NMContext() {
}
@@ -159,6 +169,10 @@ public class NodeManager extends Composi
return this.containers;
}
+ @Override
+ public NodeHealthStatus getNodeHealthStatus() {
+ return this.nodeHealthStatus;
+ }
}
public static void main(String[] args) {
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java Mon Mar 28 04:28:10 2011
@@ -30,6 +30,8 @@ import org.apache.avro.AvroRuntimeExcept
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
+import org.apache.hadoop.NodeHealthStatus;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.net.NetUtils;
@@ -70,8 +72,12 @@ public class NodeStatusUpdaterImpl exten
private byte[] secretKeyBytes = new byte[0];
private boolean isStopped;
- public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher) {
+ private final NodeHealthCheckerService healthChecker;
+
+ public NodeStatusUpdaterImpl(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker) {
super(NodeStatusUpdaterImpl.class.getName());
+ this.healthChecker = healthChecker;
this.context = context;
this.dispatcher = dispatcher;
}
@@ -196,6 +202,14 @@ public class NodeStatusUpdaterImpl exten
LOG.debug(this.nodeName + " sending out status for " + numActiveContainers
+ " containers");
+ if (this.healthChecker != null) {
+ NodeHealthStatus nodeHealthStatus = this.context.getNodeHealthStatus();
+ this.healthChecker.setHealthStatus(nodeHealthStatus);
+ status.isNodeHealthy = nodeHealthStatus.isNodeHealthy();
+ status.healthReport = nodeHealthStatus.getHealthReport();
+ status.lastHealthReport = nodeHealthStatus.getLastReported();
+ }
+
return status;
}
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestEventFlow.java Mon Mar 28 04:28:10 2011
@@ -24,6 +24,7 @@ import java.util.HashMap;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.ApplicationID;
import org.apache.hadoop.yarn.ContainerID;
@@ -49,8 +50,9 @@ public class TestEventFlow {
ContainerExecutor exec = new DefaultContainerExecutor();
DeletionService del = new DeletionService(exec);
Dispatcher dispatcher = new AsyncDispatcher();
+ NodeHealthCheckerService healthChecker = null;
NodeStatusUpdater nodeStatusUpdater =
- new NodeStatusUpdaterImpl(context, dispatcher) {
+ new NodeStatusUpdaterImpl(context, dispatcher, healthChecker) {
@Override
protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
return new LocalRMInterface();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeStatusUpdater.java Mon Mar 28 04:28:10 2011
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentMa
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.ApplicationID;
@@ -159,8 +160,9 @@ public class TestNodeStatusUpdater {
private class MyNodeStatusUpdater extends NodeStatusUpdaterImpl {
private Context context;
- public MyNodeStatusUpdater(Context context, Dispatcher dispatcher) {
- super(context, dispatcher);
+ public MyNodeStatusUpdater(Context context, Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker) {
+ super(context, dispatcher, healthChecker);
this.context = context;
}
@@ -186,8 +188,8 @@ public class TestNodeStatusUpdater {
final NodeManager nm = new NodeManager() {
@Override
protected NodeStatusUpdater createNodeStatusUpdater(Context context,
- Dispatcher dispatcher) {
- return new MyNodeStatusUpdater(context, dispatcher);
+ Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+ return new MyNodeStatusUpdater(context, dispatcher, healthChecker);
}
};
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java Mon Mar 28 04:28:10 2011
@@ -33,6 +33,7 @@ import junit.framework.Assert;
import org.apache.avro.ipc.AvroRemoteException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
@@ -98,11 +99,11 @@ public class TestContainerManager {
private ContainerExecutor exec = new DefaultContainerExecutor();
private DeletionService delSrvc;
private Dispatcher dispatcher = new AsyncDispatcher();
-
+ private NodeHealthCheckerService healthChecker = null;
private String user = "nobody";
private NodeStatusUpdater nodeStatusUpdater = new NodeStatusUpdaterImpl(
- context, dispatcher) {
+ context, dispatcher, healthChecker) {
@Override
protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
return new LocalRMInterface();
Modified: hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java?rev=1086116&r1=1086115&r2=1086116&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java (original)
+++ hadoop/mapreduce/branches/MR-279/yarn/yarn-server/yarn-server-tests/src/test/java/org/apache/hadoop/yarn/server/MiniYARNCluster.java Mon Mar 28 04:28:10 2011
@@ -23,6 +23,7 @@ import java.io.IOException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.NodeHealthCheckerService;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.yarn.YarnException;
@@ -133,8 +134,10 @@ public class MiniYARNCluster extends Com
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater
createNodeStatusUpdater(
org.apache.hadoop.yarn.server.nodemanager.Context context,
- Dispatcher dispatcher) {
- return new NodeStatusUpdaterImpl(context, dispatcher) {
+ Dispatcher dispatcher,
+ NodeHealthCheckerService healthChecker) {
+ return new NodeStatusUpdaterImpl(context, dispatcher,
+ healthChecker) {
@Override
protected org.apache.hadoop.yarn.ResourceTracker getRMClient() {
// For in-process communication without RPC