You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/12/05 12:04:23 UTC
svn commit: r723710 - in /hadoop/core/trunk: ./ conf/
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/webapps/job/
Author: ddas
Date: Fri Dec 5 03:04:17 2008
New Revision: 723710
URL: http://svn.apache.org/viewvc?rev=723710&view=rev
Log:
HADOOP-4305. Improves the blacklisting strategy, whereby, tasktrackers that are blacklisted are not given tasks to run from other jobs, subject to the following conditions (all must be met): 1) The TaskTracker has been blacklisted by at least 4 jobs (configurable) 2) The TaskTracker has been blacklisted 50% more number of times than the average (configurable) 3) The cluster has less than 50% trackers blacklisted. Once in 24 hours, a TaskTracker blacklisted for all jobs is given a chance. Restarting the TaskTracker moves it out of the blacklist. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/conf/hadoop-default.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/core/trunk/src/webapps/job/jobtracker.jsp
hadoop/core/trunk/src/webapps/job/machines.jsp
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Dec 5 03:04:17 2008
@@ -176,6 +176,17 @@
HADOOP-4747. Speed up FsShell::ls by removing redundant calls to the
filesystem. (David Phillips via cdouglas)
+ HADOOP-4305. Improves the blacklisting strategy, whereby, tasktrackers
+ that are blacklisted are not given tasks to run from other jobs, subject
+ to the following conditions (all must be met):
+ 1) The TaskTracker has been blacklisted by at least 4 jobs (configurable)
+ 2) The TaskTracker has been blacklisted 50% more number of times than
+ the average (configurable)
+ 3) The cluster has less than 50% trackers blacklisted
+ Once in 24 hours, a TaskTracker blacklisted for all jobs is given a chance.
+ Restarting the TaskTracker moves it out of the blacklist.
+ (Amareshwari Sriramadasu via ddas)
+
OPTIMIZATIONS
HADOOP-3293. Fixes FileInputFormat to do provide locations for splits
Modified: hadoop/core/trunk/conf/hadoop-default.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/conf/hadoop-default.xml?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/conf/hadoop-default.xml (original)
+++ hadoop/core/trunk/conf/hadoop-default.xml Fri Dec 5 03:04:17 2008
@@ -1296,6 +1296,17 @@
</property>
<property>
+ <name>mapred.max.tracker.blacklists</name>
+ <value>4</value>
+ <description>The number of blacklists for a taskTracker by various jobs
+ after which the task tracker could be blacklisted across
+ all jobs. The tracker will be given a tasks later
+ (after a day). The tracker will become a healthy
+ tracker after a restart.
+ </description>
+</property>
+
+<property>
<name>mapred.max.tracker.failures</name>
<value>4</value>
<description>The number of task-failures on a tasktracker of a given job
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/ClusterStatus.java Fri Dec 5 03:04:17 2008
@@ -52,6 +52,7 @@
public class ClusterStatus implements Writable {
private int task_trackers;
+ private int blacklisted_trackers;
private int map_tasks;
private int reduce_tasks;
private int max_map_tasks;
@@ -73,7 +74,24 @@
*/
ClusterStatus(int trackers, int maps, int reduces, int maxMaps,
int maxReduces, JobTracker.State state) {
+ this(trackers, 0, maps, reduces, maxMaps, maxReduces, state);
+ }
+
+ /**
+ * Construct a new cluster status.
+ *
+ * @param trackers no. of tasktrackers in the cluster
+ * @param blacklists no of blacklisted task trackers in the cluster
+ * @param maps no. of currently running map-tasks in the cluster
+ * @param reduces no. of currently running reduce-tasks in the cluster
+ * @param maxMaps the maximum no. of map tasks in the cluster
+ * @param maxReduces the maximum no. of reduce tasks in the cluster
+ * @param state the {@link JobTracker.State} of the <code>JobTracker</code>
+ */
+ ClusterStatus(int trackers, int blacklists, int maps, int reduces,
+ int maxMaps, int maxReduces, JobTracker.State state) {
task_trackers = trackers;
+ blacklisted_trackers = blacklists;
map_tasks = maps;
reduce_tasks = reduces;
max_map_tasks = maxMaps;
@@ -82,7 +100,6 @@
used_memory = Runtime.getRuntime().totalMemory();
max_memory = Runtime.getRuntime().maxMemory();
}
-
/**
* Get the number of task trackers in the cluster.
@@ -94,6 +111,15 @@
}
/**
+ * Get the number of blacklisted task trackers in the cluster.
+ *
+ * @return the number of blacklisted task trackers in the cluster.
+ */
+ public int getBlacklistedTrackers() {
+ return blacklisted_trackers;
+ }
+
+ /**
* Get the number of currently running map tasks in the cluster.
*
* @return the number of currently running map tasks in the cluster.
@@ -159,6 +185,7 @@
public void write(DataOutput out) throws IOException {
out.writeInt(task_trackers);
+ out.writeInt(blacklisted_trackers);
out.writeInt(map_tasks);
out.writeInt(reduce_tasks);
out.writeInt(max_map_tasks);
@@ -170,6 +197,7 @@
public void readFields(DataInput in) throws IOException {
task_trackers = in.readInt();
+ blacklisted_trackers = in.readInt();
map_tasks = in.readInt();
reduce_tasks = in.readInt();
max_map_tasks = in.readInt();
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Fri Dec 5 03:04:17 2008
@@ -55,8 +55,10 @@
* Version 21: Changed information reported in TaskTrackerStatus'
* ResourceStatus and the corresponding accessor methods
* (HADOOP-4035)
+ * Version 22: Replaced parameter 'initialContact' with 'restarted'
+ * in heartbeat method (HADOOP-4305)
*/
- public static final long versionID = 21L;
+ public static final long versionID = 22L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
@@ -73,8 +75,8 @@
* it recieved from the {@link JobTracker}
*
* @param status the status update
- * @param initialContact <code>true</code> if this is first interaction since
- * 'refresh', <code>false</code> otherwise.
+ * @param restarted <code>true</code> if the process has just started or
+ * restarted, <code>false</code> otherwise
* @param acceptNewTasks <code>true</code> if the {@link TaskTracker} is
* ready to accept new tasks to run.
* @param responseId the last responseId successfully acted upon by the
@@ -83,7 +85,9 @@
* fresh instructions.
*/
HeartbeatResponse heartbeat(TaskTrackerStatus status,
- boolean initialContact, boolean acceptNewTasks, short responseId)
+ boolean restarted,
+ boolean acceptNewTasks,
+ short responseId)
throws IOException;
/**
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Dec 5 03:04:17 2008
@@ -1701,8 +1701,8 @@
}
String taskTrackerName =
- JobInProgress.convertTrackerNameToHostName(
- attempt.get(Keys.TRACKER_NAME)).substring("tracker_".length());
+ JobInProgress.convertTrackerNameToHostName(
+ attempt.get(Keys.TRACKER_NAME));
return TaskLogServlet.getTaskLogUrl(taskTrackerName, attempt
.get(Keys.HTTP_PORT), attempt.get(Keys.TASK_ATTEMPT_ID));
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 5 03:04:17 2008
@@ -1203,7 +1203,7 @@
String trackerHostName = (indexOfColon == -1) ?
trackerName :
trackerName.substring(0, indexOfColon);
- return trackerHostName;
+ return trackerHostName.substring("tracker_".length());
}
/**
@@ -1238,6 +1238,21 @@
}
/**
+ * Get the black listed trackers for the job
+ *
+ * @return List of blacklisted tracker names
+ */
+ List<String> getBlackListedTrackers() {
+ List<String> blackListedTrackers = new ArrayList<String>();
+ for (Map.Entry<String,Integer> e : trackerToFailuresMap.entrySet()) {
+ if (e.getValue().intValue() >= conf.getMaxTaskFailuresPerTracker()) {
+ blackListedTrackers.add(e.getKey());
+ }
+ }
+ return blackListedTrackers;
+ }
+
+ /**
* Get the no. of 'flaky' tasktrackers for a given job.
*
* @return the no. of 'flaky' tasktrackers for a given job.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobSubmissionProtocol.java Fri Dec 5 03:04:17 2008
@@ -51,8 +51,10 @@
* setupProgress to JobStatus as part of HADOOP-4261
* Version 17: getClusterStatus returns the amount of memory used by
* the server. HADOOP-4435
+ * Version 18: Added blacklisted trackers to the ClusterStatus
+ * for HADOOP-4305
*/
- public static final long versionID = 17L;
+ public static final long versionID = 18L;
/**
* Allocate a name for the job.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Dec 5 03:04:17 2008
@@ -83,6 +83,19 @@
static long TASKTRACKER_EXPIRY_INTERVAL = 10 * 60 * 1000;
static long RETIRE_JOB_INTERVAL;
static long RETIRE_JOB_CHECK_INTERVAL;
+ // The interval after which one fault of a tracker will be discarded,
+ // if there are no faults during this.
+ private static long UPDATE_FAULTY_TRACKER_INTERVAL = 24 * 60 * 60 * 1000;
+ // The maximum percentage of trackers in cluster added
+ // to the 'blacklist' across all the jobs.
+ private static double MAX_BLACKLIST_PERCENT = 0.50;
+ // A tracker is blacklisted across jobs only if number of
+ // blacklists are X% above the average number of blacklists.
+ // X is the blacklist threshold here.
+ private double AVERAGE_BLACKLIST_THRESHOLD = 0.50;
+ // The maximum number of blacklists for a tracker after which the
+ // tracker could be blacklisted across all jobs
+ private int MAX_BLACKLISTS_PER_TRACKER = 4;
public static enum State { INITIALIZING, RUNNING }
State state = State.INITIALIZING;
private static final int SYSTEM_DIR_CLEANUP_RETRY_PERIOD = 10000;
@@ -313,6 +326,11 @@
if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
// Remove completely after marking the tasks as 'KILLED'
lostTaskTracker(leastRecent.getTrackerName());
+ // tracker is lost, and if it is blacklisted, remove
+ // it from the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.numBlacklistedTrackers -= 1;
+ }
updateTaskTrackerStatus(trackerName, null);
} else {
// Update time by inserting latest profile
@@ -402,8 +420,236 @@
}
}
}
+
+ // The FaultInfo which indicates the number of faults of a tracker
+ // and when the last fault occurred
+ // and whether the tracker is blacklisted across all jobs or not
+ private static class FaultInfo {
+ int numFaults = 0;
+ long lastUpdated;
+ boolean blacklisted;
-
+ FaultInfo() {
+ numFaults = 0;
+ lastUpdated = System.currentTimeMillis();
+ blacklisted = false;
+ }
+
+ void setFaultCount(int num) {
+ numFaults = num;
+ }
+
+ void setLastUpdated(long timeStamp) {
+ lastUpdated = timeStamp;
+ }
+
+ int getFaultCount() {
+ return numFaults;
+ }
+
+ long getLastUpdated() {
+ return lastUpdated;
+ }
+
+ boolean isBlacklisted() {
+ return blacklisted;
+ }
+
+ void setBlacklist(boolean blacklist) {
+ blacklisted = blacklist;
+ }
+ }
+
+ private class FaultyTrackersInfo {
+ // A map from hostName to its faults
+ private Map<String, FaultInfo> potentiallyFaultyTrackers =
+ new HashMap<String, FaultInfo>();
+ // This count gives the number of blacklisted trackers in the cluster
+ // at any time. This is maintained to avoid iteration over
+ // the potentiallyFaultyTrackers to get blacklisted trackers. And also
+ // this count doesn't include blacklisted trackers which are lost,
+ // although the fault info is maintained for lost trackers.
+ private volatile int numBlacklistedTrackers = 0;
+
+ /**
+ * Increments faults(blacklist by job) for the tracker by one.
+ *
+ * Adds the tracker to the potentially faulty list.
+ *
+ * @param hostName
+ */
+ void incrementFaults(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+ if (fi == null) {
+ fi = new FaultInfo();
+ potentiallyFaultyTrackers.put(hostName, fi);
+ }
+ int numFaults = fi.getFaultCount();
+ ++numFaults;
+ fi.setFaultCount(numFaults);
+ fi.setLastUpdated(System.currentTimeMillis());
+ if (!fi.isBlacklisted()) {
+ if (shouldBlacklist(hostName, numFaults)) {
+ LOG.info("Adding " + hostName + " to the blacklist" +
+ " across all jobs");
+ removeHostCapacity(hostName);
+ fi.setBlacklist(true);
+ }
+ }
+ }
+ }
+
+ /**
+ * Blacklists the tracker across all jobs if
+ * <ol>
+ * <li>#faults are more than
+ * MAX_BLACKLISTS_PER_TRACKER (configurable) blacklists</li>
+ * <li>#faults is 50% (configurable) above the average #faults</li>
+ * <li>50% the cluster is not blacklisted yet </li>
+ */
+ private boolean shouldBlacklist(String hostName, int numFaults) {
+ if (numFaults >= MAX_BLACKLISTS_PER_TRACKER) {
+ // calculate avgBlackLists
+ long clusterSize = getClusterStatus().getTaskTrackers();
+ long sum = 0;
+ for (FaultInfo f : potentiallyFaultyTrackers.values()) {
+ sum += f.getFaultCount();
+ }
+ double avg = (double) sum / clusterSize;
+
+ long totalCluster = clusterSize + numBlacklistedTrackers;
+ if ((numFaults - avg) > (AVERAGE_BLACKLIST_THRESHOLD * avg) &&
+ numBlacklistedTrackers < (totalCluster * MAX_BLACKLIST_PERCENT)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ /**
+ * Removes the tracker from blacklist and
+ * from potentially faulty list, when it is restarted.
+ *
+ * @param hostName
+ */
+ void markTrackerHealthy(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = potentiallyFaultyTrackers.remove(hostName);
+ if (fi != null && fi.isBlacklisted()) {
+ LOG.info("Removing " + hostName + " from blacklist");
+ addHostCapacity(hostName);
+ }
+ }
+ }
+
+ /**
+ * Check whether tasks can be assigned to the tracker.
+ *
+ * One fault of the tracker is discarded if there
+ * are no faults during one day. So, the tracker will get a
+ * chance again to run tasks of a job.
+ *
+ * @param hostName The tracker name
+ * @return true if the tracker is blacklisted
+ * false otherwise
+ */
+ boolean shouldAssignTasksToTracker(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = potentiallyFaultyTrackers.get(hostName);
+ long now = System.currentTimeMillis();
+ if (fi != null &&
+ (now - fi.getLastUpdated()) > UPDATE_FAULTY_TRACKER_INTERVAL) {
+ int numFaults = fi.getFaultCount() - 1;
+ if (fi.isBlacklisted()) {
+ LOG.info("Removing " + hostName + " from blacklist");
+ addHostCapacity(hostName);
+ fi.setBlacklist(false);
+ }
+ if (numFaults > 0) {
+ fi.setFaultCount(numFaults);
+ fi.setLastUpdated(now);
+ } else {
+ potentiallyFaultyTrackers.remove(hostName);
+ }
+ }
+ return (fi != null && fi.isBlacklisted());
+ }
+ }
+
+ private void removeHostCapacity(String hostName) {
+ synchronized (taskTrackers) {
+ // remove the capacity of trackers on this host
+ for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
+ totalMapTaskCapacity -= status.getMaxMapTasks();
+ totalReduceTaskCapacity -= status.getMaxReduceTasks();
+ }
+ numBlacklistedTrackers +=
+ uniqueHostsMap.remove(hostName);
+ }
+ }
+
+ // This is called on tracker's restart or after a day of blacklist.
+ private void addHostCapacity(String hostName) {
+ synchronized (taskTrackers) {
+ int numTrackersOnHost = 0;
+ // add the capacity of trackers on the host
+ for (TaskTrackerStatus status : getStatusesOnHost(hostName)) {
+ totalMapTaskCapacity += status.getMaxMapTasks();
+ totalReduceTaskCapacity += status.getMaxReduceTasks();
+ numTrackersOnHost++;
+ }
+ uniqueHostsMap.put(hostName,
+ numTrackersOnHost);
+ numBlacklistedTrackers -= numTrackersOnHost;
+ }
+ }
+
+ /**
+ * Whether a host is blacklisted across all the jobs.
+ *
+ * @param hostName
+ * @return
+ */
+ boolean isBlacklisted(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = null;
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+ return fi.isBlacklisted();
+ }
+ }
+ return false;
+ }
+
+ int getFaultCount(String hostName) {
+ synchronized (potentiallyFaultyTrackers) {
+ FaultInfo fi = null;
+ if ((fi = potentiallyFaultyTrackers.get(hostName)) != null) {
+ return fi.getFaultCount();
+ }
+ }
+ return 0;
+ }
+ }
+
+ /**
+ * Get all task tracker statuses on given host
+ *
+ * @param hostName
+ * @return {@link java.util.List} of {@link TaskTrackerStatus}
+ */
+ private List<TaskTrackerStatus> getStatusesOnHost(String hostName) {
+ List<TaskTrackerStatus> statuses = new ArrayList<TaskTrackerStatus>();
+ synchronized (taskTrackers) {
+ for (TaskTrackerStatus status : taskTrackers.values()) {
+ if (hostName.equals(status.getHost())) {
+ statuses.add(status);
+ }
+ }
+ }
+ return statuses;
+ }
+
///////////////////////////////////////////////////////
// Used to recover the jobs upon restart
///////////////////////////////////////////////////////
@@ -675,9 +921,6 @@
String trackerName = attempt.get(Keys.TRACKER_NAME);
String trackerHostName =
JobInProgress.convertTrackerNameToHostName(trackerName);
- int index = trackerHostName.indexOf("_");
- trackerHostName =
- trackerHostName.substring(index + 1, trackerHostName.length());
int port = attempt.getInt(Keys.HTTP_PORT);
long attemptStartTime = attempt.getLong(Keys.START_TIME);
@@ -991,6 +1234,8 @@
// Number of resolved entries
int numResolved;
+ private FaultyTrackersInfo faultyTrackers = new FaultyTrackersInfo();
+
//
// Watch and expire TaskTracker objects using these structures.
// We can map from Name->TaskTrackerStatus, or we can expire by time.
@@ -1063,6 +1308,14 @@
RETIRE_JOB_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.interval", 24 * 60 * 60 * 1000);
RETIRE_JOB_CHECK_INTERVAL = conf.getLong("mapred.jobtracker.retirejob.check", 60 * 1000);
MAX_COMPLETE_USER_JOBS_IN_MEMORY = conf.getInt("mapred.jobtracker.completeuserjobs.maximum", 100);
+ MAX_BLACKLISTS_PER_TRACKER =
+ conf.getInt("mapred.max.tracker.blacklists", 4);
+
+ //This configuration is there solely for tuning purposes and
+ //once this feature has been tested in real clusters and an appropriate
+ //value for the threshold has been found, this config might be taken out.
+ AVERAGE_BLACKLIST_THRESHOLD =
+ conf.getFloat("mapred.cluster.average.blacklist.threshold", 0.5f);
// This is a directory of temporary submission files. We delete it
// on startup, and can delete any files that we're done with
@@ -1538,6 +1791,15 @@
long now = System.currentTimeMillis();
+ // add the blacklisted trackers to potentially faulty list
+ if (job.getStatus().getRunState() == JobStatus.SUCCEEDED) {
+ if (job.getNoOfBlackListedTrackers() > 0) {
+ for (String hostName : job.getBlackListedTrackers()) {
+ faultyTrackers.incrementFaults(hostName);
+ }
+ }
+ }
+
// Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
// in memory; information about the purged jobs is available via
// JobHistory.
@@ -1681,11 +1943,82 @@
}
return v;
}
+
+ /**
+ * Get all the task trackers in the cluster
+ *
+ * @return {@link Collection} of {@link TaskTrackerStatus}
+ */
public Collection<TaskTrackerStatus> taskTrackers() {
synchronized (taskTrackers) {
return taskTrackers.values();
}
}
+
+ /**
+ * Get the active task tracker statuses in the cluster
+ *
+ * @return {@link Collection} of active {@link TaskTrackerStatus}
+ */
+ public Collection<TaskTrackerStatus> activeTaskTrackers() {
+ Collection<TaskTrackerStatus> activeTrackers =
+ new ArrayList<TaskTrackerStatus>();
+ synchronized (taskTrackers) {
+ for (TaskTrackerStatus status : taskTrackers.values()) {
+ if (!faultyTrackers.isBlacklisted(status.getHost())) {
+ activeTrackers.add(status);
+ }
+ }
+ }
+ return activeTrackers;
+ }
+
+ /**
+ * Get the blacklisted task tracker statuses in the cluster
+ *
+ * @return {@link Collection} of blacklisted {@link TaskTrackerStatus}
+ */
+ public Collection<TaskTrackerStatus> blacklistedTaskTrackers() {
+ Collection<TaskTrackerStatus> blacklistedTrackers =
+ new ArrayList<TaskTrackerStatus>();
+ synchronized (taskTrackers) {
+ for (TaskTrackerStatus status : taskTrackers.values()) {
+ if (faultyTrackers.isBlacklisted(status.getHost())) {
+ blacklistedTrackers.add(status);
+ }
+ }
+ }
+ return blacklistedTrackers;
+ }
+
+ int getFaultCount(String hostName) {
+ return faultyTrackers.getFaultCount(hostName);
+ }
+
+ /**
+ * Get the number of blacklisted trackers across all the jobs
+ *
+ * @return
+ */
+ int getBlacklistedTrackerCount() {
+ return faultyTrackers.numBlacklistedTrackers;
+ }
+
+ /**
+ * Whether the tracker is blacklisted or not
+ *
+ * @param trackerID
+ *
+ * @return true if blacklisted, false otherwise
+ */
+ public boolean isBlacklisted(String trackerID) {
+ TaskTrackerStatus status = getTaskTracker(trackerID);
+ if (status != null) {
+ return faultyTrackers.isBlacklisted(status.getHost());
+ }
+ return false;
+ }
+
public TaskTrackerStatus getTaskTracker(String trackerID) {
synchronized (taskTrackers) {
return taskTrackers.get(trackerID);
@@ -1810,10 +2143,12 @@
* tasks or jobs, and also 'reset' instructions during contingencies.
*/
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
- boolean initialContact, boolean acceptNewTasks, short responseId)
+ boolean restarted,
+ boolean acceptNewTasks,
+ short responseId)
throws IOException {
LOG.debug("Got heartbeat from: " + status.getTrackerName() +
- " (initialContact: " + initialContact +
+ " (restarted: " + restarted +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
@@ -1824,12 +2159,19 @@
// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
+ boolean isBlacklisted = false;
+ if (restarted) {
+ faultyTrackers.markTrackerHealthy(status.getHost());
+ } else {
+ isBlacklisted =
+ faultyTrackers.shouldAssignTasksToTracker(status.getHost());
+ }
HeartbeatResponse prevHeartbeatResponse =
trackerToHeartbeatResponseMap.get(trackerName);
boolean addRestartInfo = false;
- if (initialContact != true) {
+ if (restarted != true) {
// If this isn't the 'initial contact' from the tasktracker,
// there is something seriously wrong if the JobTracker has
// no record of the 'previous heartbeat'; if so, ask the
@@ -1841,6 +2183,7 @@
addRestartInfo = true;
} else {
// Jobtracker might have restarted but no recovery is needed
+ // otherwise this code should not be reached
LOG.warn("Serious problem, cannot find record of 'previous' " +
"heartbeat for '" + trackerName +
"'; reinitializing the tasktracker");
@@ -1865,13 +2208,9 @@
// Process this heartbeat
short newResponseId = (short)(responseId + 1);
- if (!processHeartbeat(status, initialContact)) {
- if (prevHeartbeatResponse != null) {
- trackerToHeartbeatResponseMap.remove(trackerName);
- }
-
+ if (!processHeartbeat(status, restarted)) {
return new HeartbeatResponse(newResponseId,
- new TaskTrackerAction[] {new ReinitTrackerAction()});
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
}
// Initialize the response to be sent for the heartbeat
@@ -1879,7 +2218,7 @@
List<TaskTrackerAction> actions = new ArrayList<TaskTrackerAction>();
// Check for new tasks to be executed on the tasktracker
- if (acceptNewTasks) {
+ if (acceptNewTasks && !isBlacklisted) {
TaskTrackerStatus taskTrackerStatus = getTaskTracker(trackerName);
if (taskTrackerStatus == null) {
LOG.warn("Unknown task tracker polling; ignoring: " + trackerName);
@@ -1983,8 +2322,10 @@
if (oldStatus != null) {
totalMaps -= oldStatus.countMapTasks();
totalReduces -= oldStatus.countReduceTasks();
- totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
- totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
+ if (!faultyTrackers.isBlacklisted(oldStatus.getHost())) {
+ totalMapTaskCapacity -= oldStatus.getMaxMapTasks();
+ totalReduceTaskCapacity -= oldStatus.getMaxReduceTasks();
+ }
if (status == null) {
taskTrackers.remove(trackerName);
Integer numTaskTrackersInHost =
@@ -2001,8 +2342,10 @@
if (status != null) {
totalMaps += status.countMapTasks();
totalReduces += status.countReduceTasks();
- totalMapTaskCapacity += status.getMaxMapTasks();
- totalReduceTaskCapacity += status.getMaxReduceTasks();
+ if (!faultyTrackers.isBlacklisted(status.getHost())) {
+ totalMapTaskCapacity += status.getMaxMapTasks();
+ totalReduceTaskCapacity += status.getMaxReduceTasks();
+ }
boolean alreadyPresent = false;
if (taskTrackers.containsKey(trackerName)) {
alreadyPresent = true;
@@ -2026,7 +2369,8 @@
* Process incoming heartbeat messages from the task trackers.
*/
private synchronized boolean processHeartbeat(
- TaskTrackerStatus trackerStatus, boolean initialContact) {
+ TaskTrackerStatus trackerStatus,
+ boolean restarted) {
String trackerName = trackerStatus.getTrackerName();
trackerStatus.setLastSeen(System.currentTimeMillis());
@@ -2034,7 +2378,7 @@
synchronized (trackerExpiryQueue) {
boolean seenBefore = updateTaskTrackerStatus(trackerName,
trackerStatus);
- if (initialContact) {
+ if (restarted) {
// If it's first contact, then clear out
// any state hanging around
if (seenBefore) {
@@ -2044,12 +2388,17 @@
// If not first contact, there should be some record of the tracker
if (!seenBefore) {
LOG.warn("Status from unknown Tracker : " + trackerName);
- updateTaskTrackerStatus(trackerName, null);
+ // This is lost tracker that came back now, if it blacklisted
+ // increment the count of blacklisted trackers in the cluster
+ if (isBlacklisted(trackerName)) {
+ faultyTrackers.numBlacklistedTrackers += 1;
+ }
+ addNewTracker(trackerStatus);
return false;
}
}
- if (initialContact) {
+ if (restarted) {
addNewTracker(trackerStatus);
}
}
@@ -2282,7 +2631,9 @@
public synchronized ClusterStatus getClusterStatus() {
synchronized (taskTrackers) {
- return new ClusterStatus(taskTrackers.size(),
+ return new ClusterStatus(taskTrackers.size() -
+ getBlacklistedTrackerCount(),
+ getBlacklistedTrackerCount(),
totalMaps,
totalReduces,
totalMapTaskCapacity,
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 5 03:04:17 2008
@@ -501,7 +501,6 @@
// Clear out temporary files that might be lying around
DistributedCache.purgeCache(this.fConf);
cleanupStorage();
- this.justStarted = true;
this.jobClient = (InterTrackerProtocol)
RPC.waitForProxy(InterTrackerProtocol.class,
@@ -1848,6 +1847,8 @@
task.setJobFile(localTaskFile.toString());
localJobConf.set("mapred.local.dir",
fConf.get("mapred.local.dir"));
+ localJobConf.set("slave.host.name",
+ fConf.get("slave.host.name"));
localJobConf.set("mapred.task.id", task.getTaskID().toString());
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Dec 5 03:04:17 2008
@@ -528,6 +528,10 @@
public int getNumEventsRecovered() {
return jobTracker.getJobTracker().recoveryManager.totalEventsRecovered();
}
+
+ public int getFaultCount(String hostName) {
+ return jobTracker.getJobTracker().getFaultCount(hostName);
+ }
/**
* Start the jobtracker.
Added: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java?rev=723710&view=auto
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java (added)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestTrackerBlacklistAcrossJobs.java Fri Dec 5 03:04:17 2008
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.examples.SleepJob.SleepInputFormat;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.lib.NullOutputFormat;
+
+import junit.framework.TestCase;
+
+public class TestTrackerBlacklistAcrossJobs extends TestCase {
+ private static final String hosts[] = new String[] {
+ "host1.rack.com", "host2.rack.com", "host3.rack.com"
+ };
+ final Path inDir = new Path("/testing");
+ final Path outDir = new Path("/output");
+
+ public static class SleepJobFailOnHost extends MapReduceBase
+ implements Mapper<IntWritable, IntWritable, IntWritable, NullWritable> {
+ String hostname = "";
+
+ public void configure(JobConf job) {
+ this.hostname = job.get("slave.host.name");
+ }
+
+ public void map(IntWritable key, IntWritable value,
+ OutputCollector<IntWritable, NullWritable> output,
+ Reporter reporter)
+ throws IOException {
+ if (this.hostname.equals(hosts[0])) {
+ // fail here
+ throw new IOException("failing on host: " + hosts[0]);
+ }
+ }
+ }
+
+ public void testBlacklistAcrossJobs() throws IOException {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ Configuration conf = new Configuration();
+ // setup dfs and input
+ dfs = new MiniDFSCluster(conf, 1, true, null, hosts);
+ fileSys = dfs.getFileSystem();
+ if (!fileSys.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ TestRackAwareTaskPlacement.writeFile(dfs.getNameNode(), conf,
+ new Path(inDir + "/file"), (short) 1);
+ // start mr cluster
+ JobConf jtConf = new JobConf();
+ jtConf.setInt("mapred.max.tracker.failures", 1);
+ jtConf.setInt("mapred.max.tracker.blacklists", 1);
+ mr = new MiniMRCluster(3, fileSys.getUri().toString(),
+ 1, null, hosts, jtConf);
+
+ // setup job configuration
+ JobConf mrConf = mr.createJobConf();
+ JobConf job = new JobConf(mrConf);
+ job.setNumMapTasks(30);
+ job.setNumReduceTasks(0);
+ job.setMapperClass(SleepJobFailOnHost.class);
+ job.setMapOutputKeyClass(IntWritable.class);
+ job.setMapOutputValueClass(NullWritable.class);
+ job.setOutputFormat(NullOutputFormat.class);
+ job.setInputFormat(SleepInputFormat.class);
+ FileInputFormat.setInputPaths(job, inDir);
+ FileOutputFormat.setOutputPath(job, outDir);
+
+ // run the job
+ JobClient jc = new JobClient(mrConf);
+ RunningJob running = JobClient.runJob(job);
+ assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
+ assertEquals("Didn't blacklist the host", 1,
+ jc.getClusterStatus().getBlacklistedTrackers());
+ assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
+
+ // run the same job once again
+ // there should be no change in blacklist count
+ running = JobClient.runJob(job);
+ assertEquals("Job failed", JobStatus.SUCCEEDED, running.getJobState());
+ assertEquals("Didn't blacklist the host", 1,
+ jc.getClusterStatus().getBlacklistedTrackers());
+ assertEquals("Fault count should be 1", 1, mr.getFaultCount(hosts[0]));
+ }
+}
Modified: hadoop/core/trunk/src/webapps/job/jobtracker.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/jobtracker.jsp?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/jobtracker.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/jobtracker.jsp Fri Dec 5 03:04:17 2008
@@ -31,15 +31,18 @@
"<tr><th>Maps</th><th>Reduces</th>" +
"<th>Total Submissions</th>" +
"<th>Nodes</th><th>Map Task Capacity</th>" +
- "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th></tr>\n");
+ "<th>Reduce Task Capacity</th><th>Avg. Tasks/Node</th>" +
+ "<th>Blacklisted Nodes</th></tr>\n");
out.print("<tr><td>" + status.getMapTasks() + "</td><td>" +
status.getReduceTasks() + "</td><td>" +
tracker.getTotalSubmissions() +
- "</td><td><a href=\"machines.jsp\">" +
+ "</td><td><a href=\"machines.jsp?type=active\">" +
status.getTaskTrackers() +
"</a></td><td>" + status.getMaxMapTasks() +
"</td><td>" + status.getMaxReduceTasks() +
"</td><td>" + tasksPerNode +
+ "</td><td><a href=\"machines.jsp?type=blacklisted\">" +
+ status.getBlacklistedTrackers() + "</a>" +
"</td></tr></table>\n");
out.print("<br>");
Modified: hadoop/core/trunk/src/webapps/job/machines.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/machines.jsp?rev=723710&r1=723709&r2=723710&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/machines.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/machines.jsp Fri Dec 5 03:04:17 2008
@@ -12,14 +12,22 @@
JobTracker tracker = (JobTracker) application.getAttribute("job.tracker");
String trackerName =
StringUtils.simpleHostname(tracker.getJobTrackerMachine());
+ String type = request.getParameter("type");
%>
<%!
public void generateTaskTrackerTable(JspWriter out,
+ String type,
JobTracker tracker) throws IOException {
- Collection c = tracker.taskTrackers();
-
+ Collection c;
+ if (("blacklisted").equals(type)) {
+ c = tracker.blacklistedTaskTrackers();
+ } else if (("active").equals(type)) {
+ c = tracker.activeTaskTrackers();
+ } else {
+ c = tracker.taskTrackers();
+ }
if (c.size() == 0) {
- out.print("There are currently no known Task Trackers.");
+ out.print("There are currently no known " + type + " Task Trackers.");
} else {
out.print("<center>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
@@ -77,7 +85,7 @@
<h2>Task Trackers</h2>
<%
- generateTaskTrackerTable(out, tracker);
+ generateTaskTrackerTable(out, type, tracker);
%>
<%