You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:17 UTC
svn commit: r1076950 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
core/org/apache/hadoop/util/ docs/src/documentation/content/xdocs/ mapred/
mapred/org/apache/hadoop/mapred/ test/org/apache/hadoop/mapred/
test/org/apache/hadoop/util/ ...
Author: omalley
Date: Fri Mar 4 03:25:16 2011
New Revision: 1076950
URL: http://svn.apache.org/viewvc?rev=1076950&view=rev
Log:
commit 75b06ac97b21b4b70c720374f84fdd517cf3e1e6
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:38 2009 -0700
Applying patch 2855374.mr211.patch
Added:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/util/Shell.java Fri Mar 4 03:25:16 2011
@@ -22,6 +22,9 @@ import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -55,6 +58,11 @@ abstract public class Shell {
return new String[] {(WINDOWS ? "ls" : "/bin/ls"), "-ld"};
}
+ /**Time after which the executing script would be timedout*/
+ protected long timeOutInterval = 0L;
+ /** If or not script timed out*/
+ private AtomicBoolean timedOut;
+
/**
* Get the Unix command for setting the maximum virtual memory available
* to a given child process. This is only relevant when we are forking a
@@ -98,6 +106,9 @@ abstract public class Shell {
private File dir;
private Process process; // sub process used to execute the command
private int exitCode;
+
+ /**If or not script finished executing*/
+ private volatile AtomicBoolean completed;
public Shell() {
this(0L);
@@ -137,7 +148,10 @@ abstract public class Shell {
/** Run a command */
private void runCommand() throws IOException {
ProcessBuilder builder = new ProcessBuilder(getExecString());
- boolean completed = false;
+ Timer timeOutTimer = null;
+ ShellTimeoutTimerTask timeoutTimerTask = null;
+ timedOut = new AtomicBoolean(false);
+ completed = new AtomicBoolean(false);
if (environment != null) {
builder.environment().putAll(this.environment);
@@ -147,6 +161,13 @@ abstract public class Shell {
}
process = builder.start();
+ if (timeOutInterval > 0) {
+ timeOutTimer = new Timer();
+ timeoutTimerTask = new ShellTimeoutTimerTask(
+ this);
+ //One time scheduling.
+ timeOutTimer.schedule(timeoutTimerTask, timeOutInterval);
+ }
final BufferedReader errReader =
new BufferedReader(new InputStreamReader(process
.getErrorStream()));
@@ -183,27 +204,32 @@ abstract public class Shell {
line = inReader.readLine();
}
// wait for the process to finish and check the exit code
- exitCode = process.waitFor();
+ exitCode = process.waitFor();
try {
// make sure that the error thread exits
errThread.join();
} catch (InterruptedException ie) {
LOG.warn("Interrupted while reading the error stream", ie);
}
- completed = true;
+ completed.set(true);
+ //the timeout thread handling
+ //taken care in finally block
if (exitCode != 0) {
throw new ExitCodeException(exitCode, errMsg.toString());
}
} catch (InterruptedException ie) {
throw new IOException(ie.toString());
} finally {
+ if ((timeOutTimer!=null) && !timedOut.get()) {
+ timeOutTimer.cancel();
+ }
// close the input stream
try {
inReader.close();
} catch (IOException ioe) {
LOG.warn("Error while closing the input stream", ioe);
}
- if (!completed) {
+ if (!completed.get()) {
errThread.interrupt();
}
try {
@@ -266,21 +292,47 @@ abstract public class Shell {
private String[] command;
private StringBuffer output;
+
public ShellCommandExecutor(String[] execString) {
- command = execString.clone();
+ this(execString, null);
}
-
+
public ShellCommandExecutor(String[] execString, File dir) {
- this(execString);
- this.setWorkingDirectory(dir);
+ this(execString, dir, null);
}
-
+
public ShellCommandExecutor(String[] execString, File dir,
Map<String, String> env) {
- this(execString, dir);
- this.setEnvironment(env);
+ this(execString, dir, env , 0L);
}
-
+
+ /**
+ * Create a new instance of the ShellCommandExecutor to execute a command.
+ *
+ * @param execString The command to execute with arguments
+ * @param dir If not-null, specifies the directory which should be set
+ * as the current working directory for the command.
+ * If null, the current working directory is not modified.
+ * @param env If not-null, environment of the command will include the
+ * key-value pairs specified in the map. If null, the current
+ * environment is not modified.
+ * @param timeout Specifies the time in milliseconds, after which the
+ * command will be killed and the status marked as timedout.
+ * If 0, the command will not be timed out.
+ */
+ public ShellCommandExecutor(String[] execString, File dir,
+ Map<String, String> env, long timeout) {
+ command = execString.clone();
+ if (dir != null) {
+ setWorkingDirectory(dir);
+ }
+ if (env != null) {
+ setEnvironment(env);
+ }
+ timeOutInterval = timeout;
+ }
+
+
/** Execute the shell command. */
public void execute() throws IOException {
this.run();
@@ -326,6 +378,24 @@ abstract public class Shell {
}
}
+ /**
+ * To check if the passed script to shell command executor timed out or
+ * not.
+ *
+ * @return if the script timed out.
+ */
+ public boolean isTimedOut() {
+ return timedOut.get();
+ }
+
+ /**
+ * Set if the command has timed out.
+ *
+ */
+ private void setTimedOut() {
+ this.timedOut.set(true);
+ }
+
/**
* Static method to execute a shell command.
* Covers most of the simple cases without requiring the user to implement
@@ -334,7 +404,7 @@ abstract public class Shell {
* @return the output of the executed command.
*/
public static String execCommand(String ... cmd) throws IOException {
- return execCommand(null, cmd);
+ return execCommand(null, cmd, 0L);
}
/**
@@ -343,15 +413,56 @@ abstract public class Shell {
* the <code>Shell</code> interface.
* @param env the map of environment key=value
* @param cmd shell command to execute.
+ * @param timeout time in milliseconds after which script should be marked timeout
+ * @return the output of the executed command.o
+ */
+
+ public static String execCommand(Map<String, String> env, String[] cmd,
+ long timeout) throws IOException {
+ ShellCommandExecutor exec = new ShellCommandExecutor(cmd, null, env,
+ timeout);
+ exec.execute();
+ return exec.getOutput();
+ }
+
+ /**
+ * Static method to execute a shell command.
+ * Covers most of the simple cases without requiring the user to implement
+ * the <code>Shell</code> interface.
+ * @param env the map of environment key=value
+ * @param cmd shell command to execute.
* @return the output of the executed command.
*/
public static String execCommand(Map<String,String> env, String ... cmd)
throws IOException {
- ShellCommandExecutor exec = new ShellCommandExecutor(cmd);
- if (env != null) {
- exec.setEnvironment(env);
+ return execCommand(env, cmd, 0L);
+ }
+
+ /**
+ * Timer which is used to timeout scripts spawned off by shell.
+ */
+ private static class ShellTimeoutTimerTask extends TimerTask {
+
+ private Shell shell;
+
+ public ShellTimeoutTimerTask(Shell shell) {
+ this.shell = shell;
+ }
+
+ @Override
+ public void run() {
+ Process p = shell.getProcess();
+ try {
+ p.exitValue();
+ } catch (Exception e) {
+ //Process has not terminated.
+ //So check if it has completed
+ //if not just destroy it.
+ if (p != null && !shell.completed.get()) {
+ shell.setTimedOut();
+ p.destroy();
+ }
+ }
}
- exec.execute();
- return exec.getOutput();
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/cluster_setup.xml Fri Mar 4 03:25:16 2011
@@ -589,6 +589,63 @@
</section>
</section>
+ <section>
+ <title>Monitoring Health of TaskTracker Nodes</title>
+ <p>Hadoop Map/Reduce provides a mechanism by which administrators
+ can configure the TaskTracker to run an administrator supplied
+ script periodically to determine if a node is healthy or not.
+ Administrators can determine if the node is in a healthy state
+ by performing any checks of their choice in the script. If the
+ script detects the node to be in an unhealthy state, it must print
+ a line to standard output beginning with the string <em>ERROR</em>.
+ The TaskTracker spawns the script periodically and checks its
+ output. If the script's output contains the string <em>ERROR</em>,
+ as described above, the node's status is reported as 'unhealthy'
+ and the node is black-listed on the JobTracker. No further tasks
+ will be assigned to this node. However, the
+ TaskTracker continues to run the script, so that if the node
+ becomes healthy again, it will be removed from the blacklisted
+ nodes on the JobTracker automatically. The node's health
+ along with the output of the script, if it is unhealthy, is
+ available to the administrator in the JobTracker's web interface.
+ The time since the node was healthy is also displayed on the
+ web interface.
+ </p>
+
+ <section>
+ <title>Configuring the Node Health Check Script</title>
+ <p>The following parameters can be used to control the node health
+ monitoring script in <em>mapred-site.xml</em>.</p>
+ <table>
+ <tr><th>Name</th><th>Description</th></tr>
+ <tr><td><code>mapred.healthChecker.script.path</code></td>
+ <td>Absolute path to the script which is periodically run by the
+ TaskTracker to determine if the node is
+ healthy or not. The file should be executable by the TaskTracker.
+ If the value of this key is empty or the file does
+ not exist or is not executable, node health monitoring
+ is not started.</td>
+ </tr>
+ <tr>
+ <td><code>mapred.healthChecker.interval</code></td>
+ <td>Frequency at which the node health script is run,
+ in milliseconds</td>
+ </tr>
+ <tr>
+ <td><code>mapred.healthChecker.script.timeout</code></td>
+ <td>Time after which the node health script will be killed by
+ the TaskTracker if unresponsive.
+ The node is marked unhealthy. if node health script times out.</td>
+ </tr>
+ <tr>
+ <td><code>mapred.healthChecker.script.args</code></td>
+ <td>Extra arguments that can be passed to the node health script
+ when launched.
+ These should be comma separated list of arguments. </td>
+ </tr>
+ </table>
+ </section>
+ </section>
</section>
<section>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/mapred-default.xml Fri Mar 4 03:25:16 2011
@@ -864,4 +864,40 @@
</description>
</property>
+<!-- Node health script variables -->
+
+<property>
+ <name>mapred.healthChecker.script.path</name>
+ <value></value>
+ <description>Absolute path to the script which is
+ periodicallyrun by the node health monitoring service to determine if
+ the node is healthy or not. If the value of this key is empty or the
+ file does not exist in the location configured here, the node health
+ monitoring service is not started.</description>
+</property>
+
+<property>
+ <name>mapred.healthChecker.interval</name>
+ <value>60000</value>
+ <description>Frequency of the node health script to be run,
+ in milliseconds</description>
+</property>
+
+<property>
+ <name>mapred.healthChecker.script.timeout</name>
+ <value>600000</value>
+ <description>Time after node health script should be killed if
+ unresponsive and considered that the script has failed.</description>
+</property>
+
+<property>
+ <name>mapred.healthChecker.script.args</name>
+ <value></value>
+ <description>List of arguments which are to be passed to
+ node health script when it is being launched comma seperated.
+ </description>
+</property>
+
+<!-- end of node health script variables -->
+
</configuration>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Mar 4 03:25:16 2011
@@ -62,8 +62,9 @@ interface InterTrackerProtocol extends V
* Version 24: Changed format of Task and TaskStatus for HADOOP-4759
* Version 25: JobIDs are passed in response to JobTracker restart
* Version 26: Added numRequiredSlots to TaskStatus for MAPREDUCE-516
+ * Version 27: Adding node health status to TaskStatus for MAPREDUCE-211
*/
- public static final long versionID = 26L;
+ public static final long versionID = 27L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:25:16 2011
@@ -72,6 +72,7 @@ import org.apache.hadoop.mapred.JobHisto
import org.apache.hadoop.mapred.JobHistory.Values;
import org.apache.hadoop.mapred.JobInProgress.KillInterruptedException;
import org.apache.hadoop.mapred.JobStatusChangeEvent.EventType;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.net.DNSToSwitchMapping;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
@@ -484,18 +485,28 @@ public class JobTracker implements MRCon
}
}
+ enum ReasonForBlackListing {
+ EXCEEDING_FAILURES,
+ NODE_UNHEALTHY
+ }
+
// The FaultInfo which indicates the number of faults of a tracker
// and when the last fault occurred
// and whether the tracker is blacklisted across all jobs or not
private static class FaultInfo {
+ static final String FAULT_FORMAT_STRING = "%d failures on the tracker";
int numFaults = 0;
long lastUpdated;
boolean blacklisted;
-
+
+ private boolean isHealthy;
+ private HashMap<ReasonForBlackListing, String>rfbMap;
+
FaultInfo() {
numFaults = 0;
lastUpdated = System.currentTimeMillis();
blacklisted = false;
+ rfbMap = new HashMap<ReasonForBlackListing, String>();
}
void setFaultCount(int num) {
@@ -518,9 +529,47 @@ public class JobTracker implements MRCon
return blacklisted;
}
- void setBlacklist(boolean blacklist) {
- blacklisted = blacklist;
+ void setBlacklist(ReasonForBlackListing rfb,
+ String trackerFaultReport) {
+ blacklisted = true;
+ this.rfbMap.put(rfb, trackerFaultReport);
+ }
+
+ public void setHealthy(boolean isHealthy) {
+ this.isHealthy = isHealthy;
+ }
+
+ public boolean isHealthy() {
+ return isHealthy;
+ }
+
+ public String getTrackerFaultReport() {
+ StringBuffer sb = new StringBuffer();
+ for(String reasons : rfbMap.values()) {
+ sb.append(reasons);
+ sb.append("\n");
+ }
+ return sb.toString();
+ }
+
+ Set<ReasonForBlackListing> getReasonforblacklisting() {
+ return this.rfbMap.keySet();
+ }
+
+ public void unBlacklist() {
+ this.blacklisted = false;
+ this.rfbMap.clear();
+ }
+
+ public boolean removeBlackListedReason(ReasonForBlackListing rfb) {
+ String str = rfbMap.remove(rfb);
+ return str!=null;
+ }
+
+ public void addBlackListedReason(ReasonForBlackListing rfb, String reason) {
+ this.rfbMap.put(rfb, reason);
}
+
}
private class FaultyTrackersInfo {
@@ -543,26 +592,82 @@ public class JobTracker implements MRCon
*/
void incrementFaults(String hostName) {
synchronized (potentiallyFaultyTrackers) {
- FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
- if (fi == null) {
- fi = new FaultInfo();
- potentiallyFaultyTrackers.put(hostName, fi);
- }
+ FaultInfo fi = getFaultInfo(hostName, true);
int numFaults = fi.getFaultCount();
++numFaults;
fi.setFaultCount(numFaults);
fi.setLastUpdated(System.currentTimeMillis());
- if (!fi.isBlacklisted()) {
- if (shouldBlacklist(hostName, numFaults)) {
- LOG.info("Adding " + hostName + " to the blacklist" +
- " across all jobs");
- removeHostCapacity(hostName);
- fi.setBlacklist(true);
- }
+ if (exceedsFaults(fi)) {
+ LOG.info("Adding " + hostName + " to the blacklist"
+ + " across all jobs");
+ String reason = String.format(FaultInfo.FAULT_FORMAT_STRING,
+ numFaults);
+ blackListTracker(hostName, reason,
+ ReasonForBlackListing.EXCEEDING_FAILURES);
}
}
}
+ private void blackListTracker(String hostName, String reason, ReasonForBlackListing rfb) {
+ FaultInfo fi = getFaultInfo(hostName, true);
+ boolean blackListed = fi.isBlacklisted();
+ if(blackListed) {
+ if(fi.getReasonforblacklisting().contains(rfb)) {
+ return;
+ } else {
+ LOG.info("Adding blacklisted reason for tracker : " + hostName
+ + " Reason for blacklisting is : " + rfb);
+ fi.addBlackListedReason(rfb, reason);
+ }
+ return;
+ } else {
+ LOG.info("Blacklisting tracker : " + hostName
+ + " Reason for blacklisting is : " + rfb);
+ removeHostCapacity(hostName);
+ fi.setBlacklist(rfb, reason);
+ }
+ }
+
+ private boolean canUnBlackListTracker(String hostName,
+ ReasonForBlackListing rfb) {
+ FaultInfo fi = getFaultInfo(hostName, false);
+ if(fi == null) {
+ return false;
+ }
+
+ Set<ReasonForBlackListing> rfbSet = fi.getReasonforblacklisting();
+ return fi.isBlacklisted() && rfbSet.contains(rfb);
+ }
+
+ private void unBlackListTracker(String hostName,
+ ReasonForBlackListing rfb) {
+ // check if you can black list the tracker then call this methods
+ FaultInfo fi = getFaultInfo(hostName, false);
+ if(fi.removeBlackListedReason(rfb)) {
+ if(fi.getReasonforblacklisting().isEmpty()) {
+ addHostCapacity(hostName);
+ LOG.info("Unblacklisting tracker : " + hostName);
+ fi.unBlacklist();
+ //We have unBlackListed tracker, so tracker should
+ //definitely be healthy. Check fault count if fault count
+ //is zero don't keep it memory.
+ if(fi.numFaults == 0) {
+ potentiallyFaultyTrackers.remove(hostName);
+ }
+ }
+ }
+ }
+
+ private FaultInfo getFaultInfo(String hostName,
+ boolean createIfNeccessary) {
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+ if (fi == null && createIfNeccessary) {
+ fi = new FaultInfo();
+ potentiallyFaultyTrackers.put(hostName, fi);
+ }
+ return fi;
+ }
+
/**
* Blacklists the tracker across all jobs if
* <ol>
@@ -570,9 +675,11 @@ public class JobTracker implements MRCon
* MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
* <li>#faults is 50% (configurable) above the average #faults</li>
* <li>50% the cluster is not blacklisted yet </li>
+ * </ol>
*/
- private boolean shouldBlacklist(String hostName, int numFaults) {
- if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
+ private boolean exceedsFaults(FaultInfo fi) {
+ int faultCount = fi.getFaultCount();
+ if (faultCount >= MAX_BLACKLISTS_PER_TRACKER) {
// calculate avgBlackLists
long clusterSize = getClusterStatus().getTaskTrackers();
long sum = 0;
@@ -582,7 +689,7 @@ public class JobTracker implements MRCon
double avg = (double) sum / clusterSize;
long totalCluster = clusterSize + numBlacklistedTrackers;
- if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+ if ((faultCount - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
return true;
}
@@ -625,16 +732,12 @@ public class JobTracker implements MRCon
if (fi != null &&
(now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
int numFaults = fi.getFaultCount() - 1;
- if (fi.isBlacklisted()) {
- LOG.info("Removing " + hostName + " from blacklist");
- addHostCapacity(hostName);
- fi.setBlacklist(false);
- }
- if (numFaults > 0) {
- fi.setFaultCount(numFaults);
- fi.setLastUpdated(now);
- } else {
- potentiallyFaultyTrackers.remove(hostName);
+ fi.setFaultCount(numFaults);
+ fi.setLastUpdated(now);
+ if (canUnBlackListTracker(hostName,
+ ReasonForBlackListing.EXCEEDING_FAILURES)) {
+ unBlackListTracker(hostName,
+ ReasonForBlackListing.EXCEEDING_FAILURES);
}
}
return (fi != null && fi.isBlacklisted());
@@ -703,6 +806,41 @@ public class JobTracker implements MRCon
}
return 0;
}
+
+ Set<ReasonForBlackListing> getReasonForBlackListing(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = null;
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+ return fi.getReasonforblacklisting();
+ }
+ }
+ return null;
+ }
+
+
+ void setNodeHealthStatus(String hostName, boolean isHealthy, String reason) {
+ FaultInfo fi = null;
+ // If tracker is not healthy, create a fault info object
+ // blacklist it.
+ if (!isHealthy) {
+ fi = getFaultInfo(hostName, true);
+ fi.setHealthy(isHealthy);
+ synchronized (potentiallyFaultyTrackers) {
+ blackListTracker(hostName, reason,
+ ReasonForBlackListing.NODE_UNHEALTHY);
+ }
+ } else {
+ fi = getFaultInfo(hostName, false);
+ if (fi == null) {
+ return;
+ } else {
+ if (canUnBlackListTracker(hostName,
+ ReasonForBlackListing.NODE_UNHEALTHY)) {
+ unBlackListTracker(hostName, ReasonForBlackListing.NODE_UNHEALTHY);
+ }
+ }
+ }
+ }
}
/**
@@ -2728,7 +2866,7 @@ public class JobTracker implements MRCon
// Initialize the response to be sent for the heartbeat
HeartbeatResponse response = new HeartbeatResponse(newResponseId, null);
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
-
+ isBlacklisted = faultyTrackers.isBlacklisted(status.getHost());
// Check for new tasks to be executed on the tasktracker
if (recoveryManager.shouldSchedule() && acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTrackerStatus(trackerName) ;
@@ -2930,6 +3068,15 @@ public class JobTracker implements MRCon
getInstrumentation().setReduceSlots(totalReduceTaskCapacity);
return oldStatus != null;
}
+
+
+ private void updateNodeHealthStatus(TaskTrackerStatus trackerStatus) {
+ TaskTrackerHealthStatus status = trackerStatus.getHealthStatus();
+ synchronized (faultyTrackers) {
+ faultyTrackers.setNodeHealthStatus(trackerStatus.getHost(),
+ status.isNodeHealthy(), status.getHealthReport());
+ }
+ }
/**
* Process incoming heartbeat messages from the task trackers.
@@ -2971,6 +3118,7 @@ public class JobTracker implements MRCon
}
updateTaskStatuses(trackerStatus);
+ updateNodeHealthStatus(trackerStatus);
return true;
}
@@ -4222,4 +4370,25 @@ public class JobTracker implements MRCon
UserGroupInformation.getCurrentUGI().getUserName());
this.queueManager.refreshAcls(new Configuration(this.conf));
}
+
+ String getReasonsForBlacklisting(String host) {
+ FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+ if (fi == null) {
+ return "";
+ }
+ return fi.getTrackerFaultReport();
+ }
+
+ /** Test Methods */
+ Set<ReasonForBlackListing> getReasonForBlackList(String host) {
+ FaultInfo fi = faultyTrackers.getFaultInfo(host, false);
+ if (fi == null) {
+ return new HashSet<ReasonForBlackListing>();
+ }
+ return fi.getReasonforblacklisting();
+ }
+
+ void incrementFaults(String hostName) {
+ faultyTrackers.incrementFaults(hostName);
+ }
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java?rev=1076950&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/NodeHealthCheckerService.java Fri Mar 4 03:25:16 2011
@@ -0,0 +1,360 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Timer;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.util.Shell.ExitCodeException;
+import org.apache.hadoop.util.Shell.ShellCommandExecutor;
+
+/**
+ *
+ * The class which provides functionality of checking the health of the node and
+ * reporting back to the service for which the health checker has been asked to
+ * report.
+ */
+class NodeHealthCheckerService {
+
+ private static Log LOG = LogFactory.getLog(NodeHealthCheckerService.class);
+
+ /** Absolute path to the health script. */
+ private String nodeHealthScript;
+ /** Delay after which node health script to be executed */
+ private long intervalTime;
+ /** Time after which the script should be timedout */
+ private long scriptTimeout;
+ /** Timer used to schedule node health monitoring script execution */
+ private Timer nodeHealthScriptScheduler;
+
+ /** ShellCommandExecutor used to execute monitoring script */
+ ShellCommandExecutor shexec = null;
+
+ /** Configuration used by the checker */
+ private Configuration conf;
+
+ /** Pattern used for searching in the output of the node health script */
+ static private final String ERROR_PATTERN = "ERROR";
+
+ /* Configuration keys */
+ static final String HEALTH_CHECK_SCRIPT_PROPERTY = "mapred.healthChecker.script.path";
+
+ static final String HEALTH_CHECK_INTERVAL_PROPERTY = "mapred.healthChecker.interval";
+
+ static final String HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY = "mapred.healthChecker.script.timeout";
+
+ static final String HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY = "mapred.healthChecker.script.args";
+ /* end of configuration keys */
+
+ /** Default frequency of running node health script */
+ private static final long DEFAULT_HEALTH_CHECK_INTERVAL = 10 * 60 * 1000;
+ /** Default script time out period */
+ private static final long DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL = 2 * DEFAULT_HEALTH_CHECK_INTERVAL;
+
+ private boolean isHealthy;
+
+ private String healthReport;
+
+ private long lastReportedTime;
+
+ private TimerTask timer;
+
+ private enum HealthCheckerExitStatus {
+ SUCCESS,
+ TIMED_OUT,
+ FAILED_WITH_EXIT_CODE,
+ FAILED_WITH_EXCEPTION,
+ FAILED
+ }
+
+
+ /**
+ * Class which is used by the {@link Timer} class to periodically execute the
+ * node health script.
+ *
+ */
+ private class NodeHealthMonitorExecutor extends TimerTask {
+
+ String exceptionStackTrace = "";
+
+ public NodeHealthMonitorExecutor(String[] args) {
+ ArrayList<String> execScript = new ArrayList<String>();
+ execScript.add(nodeHealthScript);
+ if (args != null) {
+ execScript.addAll(Arrays.asList(args));
+ }
+ shexec = new ShellCommandExecutor((String[]) execScript
+ .toArray(new String[execScript.size()]), null, null, scriptTimeout);
+ }
+
+ @Override
+ public void run() {
+ HealthCheckerExitStatus status = HealthCheckerExitStatus.SUCCESS;
+ try {
+ shexec.execute();
+ } catch (ExitCodeException e) {
+ // ignore the exit code of the script
+ status = HealthCheckerExitStatus.FAILED_WITH_EXIT_CODE;
+ } catch (Exception e) {
+ LOG.warn("Caught exception : " + e.getMessage());
+ status = HealthCheckerExitStatus.FAILED_WITH_EXCEPTION;
+ exceptionStackTrace = StringUtils.stringifyException(e);
+ } finally {
+ if (status == HealthCheckerExitStatus.SUCCESS) {
+ if (hasErrors(shexec.getOutput())) {
+ status = HealthCheckerExitStatus.FAILED;
+ }
+ }
+ reportHealthStatus(status);
+ }
+ }
+
+ /**
+ * Method which is used to parse output from the node health monitor and
+ * send to the report address.
+ *
+ * The timed out script or script which causes IOException output is
+ * ignored.
+ *
+ * The node is marked unhealthy if
+ * <ol>
+ * <li>The node health script times out</li>
+ * <li>The node health scripts output has a line which begins with ERROR</li>
+ * <li>An exception is thrown while executing the script</li>
+ * </ol>
+ * If the script throws {@link IOException} or {@link ExitCodeException} the
+ * output is ignored and node is left remaining healthy, as script might
+ * have syntax error.
+ *
+ * @param status
+ */
+ void reportHealthStatus(HealthCheckerExitStatus status) {
+ long now = System.currentTimeMillis();
+ switch (status) {
+ case SUCCESS:
+ setHealthStatus(true, "", now);
+ break;
+ case TIMED_OUT:
+ setHealthStatus(false, "Node health script timed out");
+ break;
+ case FAILED_WITH_EXCEPTION:
+ setHealthStatus(false, exceptionStackTrace);
+ break;
+ case FAILED_WITH_EXIT_CODE:
+ setHealthStatus(true, "", now);
+ break;
+ case FAILED:
+ setHealthStatus(false, shexec.getOutput());
+ break;
+ }
+ }
+
+ /**
+ * Method to check if the output string has line which begins with ERROR.
+ *
+ * @param output
+ * string
+ * @return true if output string has error pattern in it.
+ */
+ private boolean hasErrors(String output) {
+ String[] splits = output.split("\n");
+ for (String split : splits) {
+ if (split.startsWith(ERROR_PATTERN)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ }
+
+ public NodeHealthCheckerService(Configuration conf) {
+ this.conf = conf;
+ this.lastReportedTime = System.currentTimeMillis();
+ this.isHealthy = true;
+ this.healthReport = "";
+ initialize(conf);
+ }
+
+ /*
+ * Method which initializes the values for the script path and interval time.
+ */
+ private void initialize(Configuration conf) {
+ this.nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+ this.intervalTime = conf.getLong(HEALTH_CHECK_INTERVAL_PROPERTY,
+ DEFAULT_HEALTH_CHECK_INTERVAL);
+ this.scriptTimeout = conf.getLong(HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY,
+ DEFAULT_HEALTH_SCRIPT_FAILURE_INTERVAL);
+ String[] args = conf.getStrings(HEALTH_CHECK_SCRIPT_ARGUMENTS_PROPERTY,
+ new String[] {});
+ timer = new NodeHealthMonitorExecutor(args);
+ }
+
+ /**
+ * Method used to start the Node health monitoring.
+ *
+ */
+ void start() {
+ // if health script path is not configured don't start the thread.
+ if (!shouldRun(conf)) {
+ LOG.info("Not starting node health monitor");
+ return;
+ }
+ nodeHealthScriptScheduler = new Timer("NodeHealthMonitor-Timer", true);
+ // Start the timer task immediately and
+ // then periodically at interval time.
+ nodeHealthScriptScheduler.scheduleAtFixedRate(timer, 0, intervalTime);
+ }
+
+ /**
+ * Method used to terminate the node health monitoring service.
+ *
+ */
+ void stop() {
+ if (!shouldRun(conf)) {
+ return;
+ }
+ nodeHealthScriptScheduler.cancel();
+ if (shexec != null) {
+ Process p = shexec.getProcess();
+ if (p != null) {
+ p.destroy();
+ }
+ }
+ }
+
+ /**
+ * Gets the if the node is healthy or not
+ *
+ * @return true if node is healthy
+ */
+ private boolean isHealthy() {
+ return isHealthy;
+ }
+
+ /**
+ * Sets if the node is healhty or not.
+ *
+ * @param isHealthy
+ * if or not node is healthy
+ */
+ private synchronized void setHealthy(boolean isHealthy) {
+ this.isHealthy = isHealthy;
+ }
+
+ /**
+ * Returns output from health script. if node is healthy then an empty string
+ * is returned.
+ *
+ * @return output from health script
+ */
+ private String getHealthReport() {
+ return healthReport;
+ }
+
+ /**
+ * Sets the health report from the node health script.
+ *
+ * @param healthReport
+ */
+ private synchronized void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+
+ /**
+ * Returns time stamp when node health script was last run.
+ *
+ * @return timestamp when node health script was last run
+ */
+ private long getLastReportedTime() {
+ return lastReportedTime;
+ }
+
+ /**
+ * Sets the last run time of the node health script.
+ *
+ * @param lastReportedTime
+ */
+ private synchronized void setLastReportedTime(long lastReportedTime) {
+ this.lastReportedTime = lastReportedTime;
+ }
+
+ /**
+ * Method used to determine if or not node health monitoring service should be
+ * started or not. Returns true if following conditions are met:
+ *
+ * <ol>
+ * <li>Path to Node health check script is not empty</li>
+ * <li>Node health check script file exists</li>
+ * </ol>
+ *
+ * @param conf
+ * @return true if node health monitoring service can be started.
+ */
+ static boolean shouldRun(Configuration conf) {
+ String nodeHealthScript = conf.get(HEALTH_CHECK_SCRIPT_PROPERTY);
+ if (nodeHealthScript == null || nodeHealthScript.trim().isEmpty()) {
+ return false;
+ }
+ File f = new File(nodeHealthScript);
+ return f.exists() && f.canExecute();
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output) {
+ this.setHealthy(isHealthy);
+ this.setHealthReport(output);
+ }
+
+ private synchronized void setHealthStatus(boolean isHealthy, String output,
+ long time) {
+ this.setHealthStatus(isHealthy, output);
+ this.setLastReportedTime(time);
+ }
+
+ /**
+ * Method to populate the fields for the {@link TaskTrackerHealthStatus}
+ *
+ * @param healthStatus
+ */
+ synchronized void setHealthStatus(TaskTrackerHealthStatus healthStatus) {
+ healthStatus.setNodeHealthy(this.isHealthy());
+ healthStatus.setHealthReport(this.getHealthReport());
+ healthStatus.setLastReported(this.getLastReportedTime());
+ }
+
+ /**
+ * Test method to directly access the timer which node
+ * health checker would use.
+ *
+ *
+ * @return Timer task
+ */
+ //XXX:Not to be used directly.
+ TimerTask getTimer() {
+ return timer;
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:25:16 2011
@@ -66,6 +66,7 @@ import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.ipc.RemoteException;
import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
import org.apache.hadoop.mapred.pipes.Submitter;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.metrics.MetricsContext;
@@ -237,6 +238,11 @@ public class TaskTracker
*/
private TaskController taskController;
+ /**
+ * Handle to the specific instance of the {@link NodeHealthCheckerService}
+ */
+ private NodeHealthCheckerService healthChecker;
+
/*
* A list of commitTaskActions for whom commit response has been received
*/
@@ -554,6 +560,11 @@ public class TaskTracker
//setup and create jobcache directory with appropriate permissions
taskController.setup();
+
+ //Start up node health checker service.
+ if (shouldStartHealthMonitor(this.fConf)) {
+ startHealthMonitor(this.fConf);
+ }
}
public static Class<? extends TaskTrackerInstrumentation> getInstrumentationClass(
@@ -913,6 +924,11 @@ public class TaskTracker
taskReportServer.stop();
taskReportServer = null;
}
+ if (healthChecker != null) {
+ //stop node health checker service
+ healthChecker.stop();
+ healthChecker = null;
+ }
}
/**
@@ -1231,7 +1247,18 @@ public class TaskTracker
status.getResourceStatus().setReduceSlotMemorySizeOnTT(
reduceSlotSizeMemoryOnTT);
}
-
+ //add node health information
+
+ TaskTrackerHealthStatus healthStatus = status.getHealthStatus();
+ synchronized (this) {
+ if (healthChecker != null) {
+ healthChecker.setHealthStatus(healthStatus);
+ } else {
+ healthStatus.setNodeHealthy(true);
+ healthStatus.setLastReported(0L);
+ healthStatus.setHealthReport("");
+ }
+ }
//
// Xmit the heartbeat
//
@@ -3199,4 +3226,24 @@ public class TaskTracker
}
}
}
+
+ /**
+ * Wrapper method used by TaskTracker to check if {@link NodeHealthCheckerService}
+ * can be started
+ * @param conf configuration used to check if service can be started
+ * @return true if service can be started
+ */
+ private boolean shouldStartHealthMonitor(Configuration conf) {
+ return NodeHealthCheckerService.shouldRun(conf);
+ }
+
+ /**
+ * Wrapper method used to start {@link NodeHealthCheckerService} for
+ * Task Tracker
+ * @param conf Configuration used by the service.
+ */
+ private void startHealthMonitor(Configuration conf) {
+ healthChecker = new NodeHealthCheckerService(conf);
+ healthChecker.start();
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Fri Mar 4 03:25:16 2011
@@ -53,6 +53,7 @@ public class TaskTrackerStatus implement
volatile long lastSeen;
private int maxMapTasks;
private int maxReduceTasks;
+ private TaskTrackerHealthStatus healthStatus;
/**
* Class representing a collection of resources on this tasktracker.
@@ -196,6 +197,7 @@ public class TaskTrackerStatus implement
public TaskTrackerStatus() {
taskReports = new ArrayList<TaskStatus>();
resStatus = new ResourceStatus();
+ this.healthStatus = new TaskTrackerHealthStatus();
}
/**
@@ -213,6 +215,7 @@ public class TaskTrackerStatus implement
this.maxMapTasks = maxMapTasks;
this.maxReduceTasks = maxReduceTasks;
this.resStatus = new ResourceStatus();
+ this.healthStatus = new TaskTrackerHealthStatus();
}
/**
@@ -377,13 +380,128 @@ public class TaskTrackerStatus implement
ResourceStatus getResourceStatus() {
return resStatus;
}
+
+ /**
+ * Returns health status of the task tracker.
+ * @return health status of Task Tracker
+ */
+ public TaskTrackerHealthStatus getHealthStatus() {
+ return healthStatus;
+ }
+
+ /**
+ * Static class which encapsulates the Node health
+ * related fields.
+ *
+ */
+ /**
+ * Static class which encapsulates the Node health
+ * related fields.
+ *
+ */
+ static class TaskTrackerHealthStatus implements Writable {
+
+ private boolean isNodeHealthy;
+
+ private String healthReport;
+
+ private long lastReported;
+
+ public TaskTrackerHealthStatus(boolean isNodeHealthy, String healthReport,
+ long lastReported) {
+ this.isNodeHealthy = isNodeHealthy;
+ this.healthReport = healthReport;
+ this.lastReported = lastReported;
+ }
+
+ public TaskTrackerHealthStatus() {
+ this.isNodeHealthy = true;
+ this.healthReport = "";
+ this.lastReported = System.currentTimeMillis();
+ }
+
+ /**
+ * Sets whether or not a task tracker is healthy or not, based on the
+ * output from the node health script.
+ *
+ * @param isNodeHealthy
+ */
+ void setNodeHealthy(boolean isNodeHealthy) {
+ this.isNodeHealthy = isNodeHealthy;
+ }
+
+ /**
+ * Returns if node is healthy or not based on result from node health
+ * script.
+ *
+ * @return true if the node is healthy.
+ */
+ boolean isNodeHealthy() {
+ return isNodeHealthy;
+ }
+
+ /**
+ * Sets the health report based on the output from the health script.
+ *
+ * @param healthReport
+ * String listing cause of failure.
+ */
+ void setHealthReport(String healthReport) {
+ this.healthReport = healthReport;
+ }
+
+ /**
+ * Returns the health report of the node if any, The health report is
+ * only populated when the node is not healthy.
+ *
+ * @return health report of the node if any
+ */
+ String getHealthReport() {
+ return healthReport;
+ }
+
+ /**
+ * Sets when the TT got its health information last
+ * from node health monitoring service.
+ *
+ * @param lastReported last reported time by node
+ * health script
+ */
+ public void setLastReported(long lastReported) {
+ this.lastReported = lastReported;
+ }
+
+ /**
+ * Gets time of most recent node health update.
+ *
+ * @return time stamp of most recent health update.
+ */
+ public long getLastReported() {
+ return lastReported;
+ }
+
+ @Override
+ public void readFields(DataInput in) throws IOException {
+ isNodeHealthy = in.readBoolean();
+ healthReport = Text.readString(in);
+ lastReported = in.readLong();
+ }
+
+ @Override
+ public void write(DataOutput out) throws IOException {
+ out.writeBoolean(isNodeHealthy);
+ Text.writeString(out, healthReport);
+ out.writeLong(lastReported);
+ }
+
+ }
///////////////////////////////////////////
// Writable
///////////////////////////////////////////
public void write(DataOutput out) throws IOException {
- UTF8.writeString(out, trackerName);
- UTF8.writeString(out, host);
+ Text.writeString(out, trackerName);
+ Text.writeString(out, host);
out.writeInt(httpPort);
out.writeInt(failures);
out.writeInt(maxMapTasks);
@@ -394,11 +512,12 @@ public class TaskTrackerStatus implement
for (TaskStatus taskStatus : taskReports) {
TaskStatus.writeTaskStatus(out, taskStatus);
}
+ getHealthStatus().write(out);
}
public void readFields(DataInput in) throws IOException {
- this.trackerName = UTF8.readString(in);
- this.host = UTF8.readString(in);
+ this.trackerName = Text.readString(in);
+ this.host = Text.readString(in);
this.httpPort = in.readInt();
this.failures = in.readInt();
this.maxMapTasks = in.readInt();
@@ -410,5 +529,6 @@ public class TaskTrackerStatus implement
for (int i = 0; i < numTasks; i++) {
taskReports.add(TaskStatus.readTaskStatus(in));
}
+ getHealthStatus().readFields(in);
}
}
Added: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java?rev=1076950&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java (added)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestNodeHealthService.java Fri Mar 4 03:25:16 2011
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.util.TimerTask;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskTrackerStatus.TaskTrackerHealthStatus;
+
+import junit.framework.TestCase;
+
+public class TestNodeHealthService extends TestCase {
+
+ private static volatile Log LOG = LogFactory
+ .getLog(TestNodeHealthService.class);
+
+ private static final String nodeHealthConfigPath = System.getProperty(
+ "test.build.extraconf", "build/test/extraconf");
+
+ final static File nodeHealthConfigFile = new File(nodeHealthConfigPath,
+ "mapred-site.xml");
+
+ private String testRootDir = new File(System.getProperty("test.build.data",
+ "/tmp")).getAbsolutePath();
+
+ private File nodeHealthscriptFile = new File(testRootDir, "failingscript.sh");
+
+ @Override
+ protected void tearDown() throws Exception {
+ if (nodeHealthConfigFile.exists()) {
+ nodeHealthConfigFile.delete();
+ }
+ if (nodeHealthscriptFile.exists()) {
+ nodeHealthscriptFile.delete();
+ }
+ super.tearDown();
+ }
+
+ private Configuration getConfForNodeHealthScript() {
+ Configuration conf = new Configuration();
+ conf.set(NodeHealthCheckerService.HEALTH_CHECK_SCRIPT_PROPERTY,
+ nodeHealthscriptFile.getAbsolutePath());
+ conf.setLong(NodeHealthCheckerService.HEALTH_CHECK_INTERVAL_PROPERTY, 500);
+ conf.setLong(
+ NodeHealthCheckerService.HEALTH_CHECK_FAILURE_INTERVAL_PROPERTY, 1000);
+ return conf;
+ }
+
+ private void writeNodeHealthScriptFile(String scriptStr, boolean setExecutable)
+ throws IOException {
+ PrintWriter pw = new PrintWriter(new FileOutputStream(nodeHealthscriptFile));
+ pw.println(scriptStr);
+ pw.flush();
+ pw.close();
+ nodeHealthscriptFile.setExecutable(setExecutable);
+ }
+
+ public void testNodeHealthScriptShouldRun() throws IOException {
+ // Node health script should not start if there is no property called
+ // node health script path.
+ assertFalse("Health checker should not have started",
+ NodeHealthCheckerService.shouldRun(new Configuration()));
+ Configuration conf = getConfForNodeHealthScript();
+ // Node health script should not start if the node health script does not
+ // exists
+ assertFalse("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ // Create script path.
+ conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+ writeNodeHealthScriptFile("", false);
+ // Node health script should not start if the node health script is not
+ // executable.
+ assertFalse("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ writeNodeHealthScriptFile("", true);
+ assertTrue("Node health script should start", NodeHealthCheckerService
+ .shouldRun(conf));
+ }
+
+ public void testNodeHealthScript() throws Exception {
+ TaskTrackerHealthStatus healthStatus = new TaskTrackerHealthStatus();
+ String errorScript = "echo ERROR\n echo \"Tracker not healthy\"";
+ String normalScript = "echo \"I am all fine\"";
+ String timeOutScript = "sleep 4\n echo\"I am fine\"";
+ Configuration conf = getConfForNodeHealthScript();
+ conf.writeXml(new FileOutputStream(nodeHealthConfigFile));
+
+ NodeHealthCheckerService nodeHealthChecker = new NodeHealthCheckerService(
+ conf);
+ TimerTask timer = nodeHealthChecker.getTimer();
+ writeNodeHealthScriptFile(normalScript, true);
+ timer.run();
+
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking initial healthy condition");
+ // Check proper report conditions.
+ assertTrue("Node health status reported unhealthy", healthStatus
+ .isNodeHealthy());
+ assertTrue("Node health status reported unhealthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // write out error file.
+ // Healthy to unhealthy transition
+ writeNodeHealthScriptFile(errorScript, true);
+ // Run timer
+ timer.run();
+ // update health status
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking Healthy--->Unhealthy");
+ assertFalse("Node health status reported healthy", healthStatus
+ .isNodeHealthy());
+ assertFalse("Node health status reported healthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // Check unhealthy to healthy transitions.
+ writeNodeHealthScriptFile(normalScript, true);
+ timer.run();
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking UnHealthy--->healthy");
+ // Check proper report conditions.
+ assertTrue("Node health status reported unhealthy", healthStatus
+ .isNodeHealthy());
+ assertTrue("Node health status reported unhealthy", healthStatus
+ .getHealthReport().isEmpty());
+
+ // Healthy to timeout transition.
+ writeNodeHealthScriptFile(timeOutScript, true);
+ timer.run();
+ nodeHealthChecker.setHealthStatus(healthStatus);
+ LOG.info("Checking Healthy--->timeout");
+ assertFalse("Node health status reported healthy even after timeout",
+ healthStatus.isNodeHealthy());
+ assertFalse("Node health status reported healthy even after timeout",
+ healthStatus.getHealthReport().isEmpty());
+ }
+
+}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/util/TestShell.java Fri Mar 4 03:25:16 2011
@@ -20,7 +20,10 @@ package org.apache.hadoop.util;
import junit.framework.TestCase;
import java.io.BufferedReader;
+import java.io.File;
+import java.io.FileOutputStream;
import java.io.IOException;
+import java.io.PrintWriter;
public class TestShell extends TestCase {
@@ -71,6 +74,27 @@ public class TestShell extends TestCase
assertInString(command, " .. ");
assertInString(command, "\"arg 2\"");
}
+
+ public void testShellCommandTimeout() throws Throwable {
+ String rootDir = new File(System.getProperty(
+ "test.build.data", "/tmp")).getAbsolutePath();
+ File shellFile = new File(rootDir, "timeout.sh");
+ String timeoutCommand = "sleep 4; echo \"hello\"";
+ PrintWriter writer = new PrintWriter(new FileOutputStream(shellFile));
+ writer.println(timeoutCommand);
+ writer.close();
+ shellFile.setExecutable(true);
+ Shell.ShellCommandExecutor shexc
+ = new Shell.ShellCommandExecutor(new String[]{shellFile.getAbsolutePath()},
+ null, null, 100);
+ try {
+ shexc.execute();
+ } catch (Exception e) {
+ //When timing out exception is thrown.
+ }
+ shellFile.delete();
+ assertTrue("Script didnt not timeout" , shexc.isTimedOut());
+ }
private void testInterval(long interval) throws IOException {
Command command = new Command(interval);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp?rev=1076950&r1=1076949&r2=1076950&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/machines.jsp Fri Mar 4 03:25:16 2011
@@ -29,23 +29,43 @@
out.println("<h2>Task Trackers</h2>");
c = tracker.taskTrackers();
}
+ int noCols = 9;
+ if(type.equals("blacklisted")) {
+ noCols = 10;
+ }
if (c.size() == 0) {
out.print("There are currently no known " + type + " Task Trackers.");
} else {
out.print("<center>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
- out.print("<tr><td align=\"center\" colspan=\"6\"><b>Task Trackers</b></td></tr>\n");
+ out.print("<tr><td align=\"center\" colspan=\""+ noCols +"\"><b>Task Trackers</b></td></tr>\n");
out.print("<tr><td><b>Name</b></td><td><b>Host</b></td>" +
"<td><b># running tasks</b></td>" +
"<td><b>Max Map Tasks</b></td>" +
"<td><b>Max Reduce Tasks</b></td>" +
"<td><b>Failures</b></td>" +
- "<td><b>Seconds since heartbeat</b></td></tr>\n");
+ "<td><b>Node Health Status</b></td>" +
+ "<td><b>Seconds Since Node Last Healthy</b></td>");
+ if(type.equals("blacklisted")) {
+ out.print("<td><b>Reason For blacklisting</b></td>");
+ }
+ out.print("<td><b>Seconds since heartbeat</b></td></tr>\n");
+
int maxFailures = 0;
String failureKing = null;
for (Iterator it = c.iterator(); it.hasNext(); ) {
TaskTrackerStatus tt = (TaskTrackerStatus) it.next();
long sinceHeartbeat = System.currentTimeMillis() - tt.getLastSeen();
+ boolean isHealthy = tt.getHealthStatus().isNodeHealthy();
+ long sinceHealthCheck = tt.getHealthStatus().getLastReported();
+ String healthString = "";
+ if(sinceHealthCheck == 0) {
+ healthString = "N/A";
+ } else {
+ healthString = (isHealthy?"Healthy":"Unhealthy");
+ sinceHealthCheck = System.currentTimeMillis() - sinceHealthCheck;
+ sinceHealthCheck = sinceHealthCheck/1000;
+ }
if (sinceHeartbeat > 0) {
sinceHeartbeat = sinceHeartbeat / 1000;
}
@@ -65,8 +85,13 @@
out.print(tt.getHost() + "</td><td>" + numCurTasks +
"</td><td>" + tt.getMaxMapSlots() +
"</td><td>" + tt.getMaxReduceSlots() +
- "</td><td>" + numFailures +
- "</td><td>" + sinceHeartbeat + "</td></tr>\n");
+ "</td><td>" + numFailures +
+ "</td><td>" + healthString +
+ "</td><td>" + sinceHealthCheck);
+ if(type.equals("blacklisted")) {
+ out.print("</td><td>" + tracker.getReasonsForBlacklisting(tt.getHost()));
+ }
+ out.print("</td><td>" + sinceHeartbeat + "</td></tr>\n");
}
out.print("</table>\n");
out.print("</center>\n");