You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/03 21:51:15 UTC
svn commit: r534975 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/webapps/job/
Author: cutting
Date: Thu May 3 12:51:14 2007
New Revision: 534975
URL: http://svn.apache.org/viewvc?view=rev&rev=534975
Log:
HADOOP-1144. Permit one to specify the maximum percentage of tasks that can fail before a job is aborted. Contributed by Arun.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May 3 12:51:14 2007
@@ -325,6 +325,10 @@
96. HADOOP-1322. Fix TaskTracker blacklisting to work correctly in
one- and two-node clusters. (Arun C Murthy via cutting)
+97. HADOOP-1144. Permit one to specify a maximum percentage of tasks
+ that can fail before a job is aborted. The default is zero.
+ (Arun C Murthy via cutting)
+
Release 0.12.3 - 2007-04-06
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu May 3 12:51:14 2007
@@ -613,6 +613,50 @@
public int getMaxTaskFailuresPerTracker() {
return getInt("mapred.max.tracker.failures", 4);
}
+
+ /**
+ * Get the maximum percentage of map tasks that can fail without
+ * the job being aborted.
+ *
+ * @return the maximum percentage of map tasks that can fail without
+ * the job being aborted
+ */
+ public int getMaxMapTaskFailuresPercent() {
+ return getInt("mapred.max.map.failures.percent", 0);
+ }
+
+ /**
+ * Set the maximum percentage of map tasks that can fail without the job
+ * being aborted.
+ *
+ * @param percent the maximum percentage of map tasks that can fail without
+ * the job being aborted
+ */
+ public void setMaxMapTaskFailuresPercent(int percent) {
+ setInt("mapred.max.map.failures.percent", percent);
+ }
+
+ /**
+ * Get the maximum percentage of reduce tasks that can fail without
+ * the job being aborted.
+ *
+ * @return the maximum percentage of reduce tasks that can fail without
+ * the job being aborted
+ */
+ public int getMaxReduceTaskFailuresPercent() {
+ return getInt("mapred.max.reduce.failures.percent", 0);
+ }
+
+ /**
+ * Set the maximum percentage of reduce tasks that can fail without the job
+ * being aborted.
+ *
+ * @param percent the maximum percentage of reduce tasks that can fail without
+ * the job being aborted
+ */
+ public void setMaxReduceTaskFailuresPercent(int percent) {
+ setInt("mapred.max.reduce.failures.percent", percent);
+ }
/** Find a jar that contains a class of the same name, if any.
* It will return a jar file, even if that is not the first thing
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu May 3 12:51:14 2007
@@ -63,7 +63,13 @@
int finishedMapTasks = 0;
int finishedReduceTasks = 0;
int failedMapTasks = 0;
- int failedReduceTasks = 0;
+ int failedReduceTasks = 0;
+
+ int mapFailuresPercent = 0;
+ int reduceFailuresPercent = 0;
+ int failedMapTIPs = 0;
+ int failedReduceTIPs = 0;
+
JobTracker jobtracker = null;
Map<String,List<TaskInProgress>> hostToMaps =
new HashMap<String,List<TaskInProgress>>();
@@ -91,7 +97,14 @@
private LocalFileSystem localFs;
private String uniqueString;
-
+
+ // Per-job counters
+ public static enum Counter {
+ NUM_FAILED_MAPS,
+ NUM_FAILED_REDUCES
+ }
+ private Counters jobCounters = new Counters();
+
private Counters mapCounters = new Counters();
private Counters reduceCounters = new Counters();
private MetricsRecord jobMetrics;
@@ -130,6 +143,9 @@
this.numReduceTasks = conf.getNumReduceTasks();
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
numMapTasks + numReduceTasks + 10);
+
+ this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+ this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(),
System.currentTimeMillis(), jobFile);
@@ -359,14 +375,6 @@
tip.setSuccessEventNumber(taskCompletionEventTracker);
} else if (state == TaskStatus.State.FAILED ||
state == TaskStatus.State.KILLED) {
- taskEvent = new TaskCompletionEvent(
- taskCompletionEventTracker,
- status.getTaskId(),
- tip.idWithinJob(),
- status.getIsMap(),
- TaskCompletionEvent.Status.FAILED,
- httpTaskLogLocation
- );
// Get the event number for the (possibly) previously successful
// task. If there exists one, then set that status to OBSOLETE
int eventNumber;
@@ -376,9 +384,24 @@
if (t.getTaskId().equals(status.getTaskId()))
t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
}
+
// Tell the job to fail the relevant task
failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
- wasRunning, wasComplete);
+ wasRunning, wasComplete, metrics);
+
+ // Did the task failure lead to tip failure?
+ TaskCompletionEvent.Status taskCompletionStatus =
+ TaskCompletionEvent.Status.FAILED;
+ if (tip.isFailed()) {
+ taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+ }
+ taskEvent = new TaskCompletionEvent(taskCompletionEventTracker,
+ status.getTaskId(),
+ tip.idWithinJob(),
+ status.getIsMap(),
+ taskCompletionStatus,
+ httpTaskLogLocation
+ );
}
// Add the 'complete' task i.e. successful/failed
@@ -411,7 +434,16 @@
}
}
}
-
+
+ /**
+ * Returns the job-level counters.
+ *
+ * @return the job-level counters.
+ */
+ public synchronized Counters getJobCounters() {
+ return jobCounters;
+ }
+
/**
* Returns map phase counters by summing over all map tasks in progress.
*/
@@ -427,11 +459,12 @@
}
/**
- * Returns the total job counters, by adding together the map and the
- * reduce counters.
+ * Returns the total job counters, by adding together the job,
+ * the map and the reduce counters.
*/
public Counters getCounters() {
- return Counters.sum(getMapCounters(), getReduceCounters());
+ return Counters.sum(getJobCounters(),
+ Counters.sum(getMapCounters(), getReduceCounters()));
}
/**
@@ -741,9 +774,27 @@
//
// Figure out whether the Job is done
//
+ isJobComplete(tip, metrics);
+
+ if (this.status.getRunState() != JobStatus.RUNNING) {
+ // The job has been killed/failed,
+ // JobTracker should cleanup this task
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+ }
+ }
+
+ /**
+ * Check if the job is done since all it's component tasks are either
+ * successful or have failed.
+ *
+ * @param tip the current tip which completed either succesfully or failed
+ * @param metrics job-tracker metrics
+ * @return
+ */
+ private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
boolean allDone = true;
for (int i = 0; i < maps.length; i++) {
- if (!maps[i].isComplete()) {
+ if (!(maps[i].isComplete() || maps[i].isFailed())) {
allDone = false;
break;
}
@@ -753,7 +804,7 @@
this.status.setMapProgress(1.0f);
}
for (int i = 0; i < reduces.length; i++) {
- if (!reduces[i].isComplete()) {
+ if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
allDone = false;
break;
}
@@ -773,13 +824,11 @@
JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,
this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
metrics.completeJob();
- } else if (this.status.getRunState() != JobStatus.RUNNING) {
- // The job has been killed/failed,
- // JobTracker should cleanup this task
- jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+ return true;
}
+
+ return false;
}
-
/**
* Kill the job and all its component tasks.
*/
@@ -809,7 +858,7 @@
* A task assigned to this JobInProgress has reported in as failed.
* Most of the time, we'll just reschedule execution. However, after
* many repeated failures we may instead decide to allow the entire
- * job to fail.
+ * job to fail or succeed if the user doesn't care about a few tasks failing.
*
* Even if a task has reported as completed in the past, it might later
* be reported as failed. That's because the TaskTracker that hosts a map
@@ -819,7 +868,8 @@
*/
private void failedTask(TaskInProgress tip, String taskid,
TaskStatus status, String trackerName,
- boolean wasRunning, boolean wasComplete) {
+ boolean wasRunning, boolean wasComplete,
+ JobTrackerMetrics metrics) {
// Mark the taskid as a 'failure'
tip.incompleteSubTask(taskid, trackerName);
@@ -881,16 +931,45 @@
jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
//
- // Check if we need to kill the job because of too many failures
+ // Check if we need to kill the job because of too many failures or
+ // if the job is complete since all component tasks have completed
//
if (tip.isFailed()) {
- LOG.info("Aborting job " + profile.getJobId());
- JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),
- tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(),
- System.currentTimeMillis(), status.getDiagnosticInfo());
- JobHistory.JobInfo.logFailed(profile.getJobId(),
- System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
- kill();
+ //
+ // Allow upto 'mapFailuresPercent' of map tasks to fail or
+ // 'reduceFailuresPercent' of reduce tasks to fail
+ //
+ boolean killJob =
+ tip.isMapTask() ?
+ (((++failedMapTIPs*100)/numMapTasks) > mapFailuresPercent) :
+ (((++failedReduceTIPs*100)/numReduceTasks) > reduceFailuresPercent);
+
+ if (killJob) {
+ LOG.info("Aborting job " + profile.getJobId());
+ JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),
+ tip.isMapTask() ?
+ Values.MAP.name() :
+ Values.REDUCE.name(),
+ System.currentTimeMillis(),
+ status.getDiagnosticInfo());
+ JobHistory.JobInfo.logFailed(profile.getJobId(),
+ System.currentTimeMillis(),
+ this.finishedMapTasks,
+ this.finishedReduceTasks
+ );
+ kill();
+ } else {
+ isJobComplete(tip, metrics);
+ }
+
+ //
+ // Update the counters
+ //
+ if (tip.isMapTask()) {
+ jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+ } else {
+ jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+ }
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu May 3 12:51:14 2007
@@ -822,43 +822,39 @@
copyProgress.start();
try {
// loop until we get all required outputs
- while (numCopied < numOutputs && mergeThrowable == null) {
+ while (!neededOutputs.isEmpty() && mergeThrowable == null) {
- LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
- " map output(s)");
+ LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+ " map output(s)");
- if (!neededOutputs.isEmpty()) {
- LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
- " map output location(s)");
- try {
- // Put the hash entries for the failed fetches. Entries here
- // might be replaced by (mapId) hashkeys from new successful
- // Map executions, if the fetch failures were due to lost tasks.
- // The replacements, if at all, will happen when we query the
- // tasktracker and put the mapId hashkeys with new
- // MapOutputLocations as values
- knownOutputs.putAll(retryFetches);
- // The call getSuccessMapEvents will modify fromEventId to a val
- // that it should be for the next call to getSuccessMapEvents
- List <MapOutputLocation> locs = getSuccessMapEvents(fromEventId);
-
- // put discovered them on the known list
- for (int i=0; i < locs.size(); i++) {
- knownOutputs.put(new Integer(locs.get(i).getMapId()),
- locs.get(i));
- }
- LOG.info(reduceTask.getTaskId() +
- " Got " + locs.size() +
- " new map outputs from tasktracker and " + retryFetches.size()
- + " map outputs from previous failures");
- // clear the "failed" fetches hashmap
- retryFetches.clear();
- }
- catch (IOException ie) {
- LOG.warn(reduceTask.getTaskId() +
- " Problem locating map outputs: " +
- StringUtils.stringifyException(ie));
+ try {
+ // Put the hash entries for the failed fetches. Entries here
+ // might be replaced by (mapId) hashkeys from new successful
+ // Map executions, if the fetch failures were due to lost tasks.
+ // The replacements, if at all, will happen when we query the
+ // tasktracker and put the mapId hashkeys with new
+ // MapOutputLocations as values
+ knownOutputs.putAll(retryFetches);
+ // The call getsMapCompletionEvents will modify fromEventId to a val
+ // that it should be for the next call to getSuccessMapEvents
+ List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
+
+ // put discovered them on the known list
+ for (int i=0; i < locs.size(); i++) {
+ knownOutputs.put(new Integer(locs.get(i).getMapId()),
+ locs.get(i));
}
+ LOG.info(reduceTask.getTaskId() +
+ " Got " + locs.size() +
+ " new map outputs from tasktracker and " + retryFetches.size()
+ + " map outputs from previous failures");
+ // clear the "failed" fetches hashmap
+ retryFetches.clear();
+ }
+ catch (IOException ie) {
+ LOG.warn(reduceTask.getTaskId() +
+ " Problem locating map outputs: " +
+ StringUtils.stringifyException(ie));
}
// now walk through the cache and schedule what we can
@@ -1009,7 +1005,7 @@
if (inMemClosedFiles.length == 0) {
LOG.info(reduceTask.getTaskId() + "Nothing to merge from " +
inMemFileSys.getUri());
- return numCopied == numOutputs;
+ return neededOutputs.isEmpty();
}
//name this output file same as the name of the first file that is
//there in the current list of inmem files (this is guaranteed to be
@@ -1047,7 +1043,7 @@
return false;
}
}
- return mergeThrowable == null && numCopied == numOutputs;
+ return mergeThrowable == null && neededOutputs.isEmpty();
} finally {
inMemFileSys.close();
copyProgress.interrupt();
@@ -1077,7 +1073,7 @@
* @return a set of locations to copy outputs from
* @throws IOException
*/
- private List <MapOutputLocation> getSuccessMapEvents(IntWritable fromEventId)
+ private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId)
throws IOException {
long currentTime = System.currentTimeMillis();
@@ -1097,14 +1093,17 @@
List <MapOutputLocation> mapOutputsList =
new ArrayList<MapOutputLocation>();
- for (int i = 0; i < t.length; i++) {
- if (t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
- URI u = URI.create(t[i].getTaskTrackerHttp());
+ for (TaskCompletionEvent event : t) {
+ if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+ URI u = URI.create(event.getTaskTrackerHttp());
String host = u.getHost();
int port = u.getPort();
- String taskId = t[i].getTaskId();
- int mId = t[i].idWithinJob();
+ String taskId = event.getTaskId();
+ int mId = event.idWithinJob();
mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+ } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) {
+ neededOutputs.remove(event.idWithinJob());
+ LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'");
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Thu May 3 12:51:14 2007
@@ -12,7 +12,7 @@
*
*/
public class TaskCompletionEvent implements Writable{
- static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
+ static public enum Status {FAILED, SUCCEEDED, OBSOLETE, TIPFAILED};
private int eventId;
private String taskTrackerHttp;
Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu May 3 12:51:14 2007
@@ -160,7 +160,7 @@
<%
Counters mapCounters = job.getMapCounters();
Counters reduceCounters = job.getReduceCounters();
- Counters totalCounters = Counters.sum(mapCounters,reduceCounters);
+ Counters totalCounters = job.getCounters();
for (String groupName : totalCounters.getGroupNames()) {
Counters.Group totalGroup = totalCounters.getGroup(groupName);