You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/12/21 19:57:56 UTC
svn commit: r606268 - in /lucene/hadoop/branches/branch-0.15: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/java/org/apache/hadoop/mapred/ReduceTask.java
src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
Author: acmurthy
Date: Fri Dec 21 10:57:55 2007
New Revision: 606268
URL: http://svn.apache.org/viewvc?rev=606268&view=rev
Log:
Merge -r 606266:606267 from trunk to branch-0.15 to fix HADOOP-2247.
Modified:
lucene/hadoop/branches/branch-0.15/CHANGES.txt
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Fri Dec 21 10:57:55 2007
@@ -39,6 +39,14 @@
round-robin disk selections randomly. This helps in spreading data across
multiple partitions much better. (acmurhty)
+ HADOOP-2247. Fine-tune the strategies for killing mappers and reducers
+ due to failures while fetching map-outputs. Now the map-completion times
+ and number of currently running reduces are taken into account by the
+ JobTracker before killing the mappers, while the progress made by the
+ reducer and the number of fetch-failures vis-a-vis total number of
+ fetch-attempts are taken into account before teh reducer kills itself.
+ (Amar Kamat via acmurthy)
+
IMPROVEMENTS
HADOOP-2160. Remove project-level, non-user documentation from
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 10:57:55 2007
@@ -79,6 +79,9 @@
// The maximum percentage of trackers in cluster added to the 'blacklist'.
private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
+ // The maximum percentage of fetch failures allowed for a map
+ private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+
// No. of tasktrackers in the cluster
private volatile int clusterSize = 0;
@@ -405,6 +408,8 @@
TaskCompletionEvent.Status.SUCCEEDED,
httpTaskLogLocation
);
+ taskEvent.setTaskRunTime((int)(status.getFinishTime()
+ - status.getStartTime()));
tip.setSuccessEventNumber(taskCompletionEventTracker);
}
//For a failed task update the JT datastructures.For the task state where
@@ -1164,7 +1169,13 @@
LOG.info("Failed fetch notification #" + fetchFailures + " for task " +
mapTaskId);
- if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) {
+ float failureRate = (float)fetchFailures / runningReduceTasks;
+ // declare faulty if fetch-failures >= max-allowed-failures
+ boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT)
+ ? true
+ : false;
+ if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
+ && isMapFaulty) {
LOG.info("Too many fetch-failures for output of task: " + mapTaskId
+ " ... killing it");
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 21 10:57:55 2007
@@ -476,11 +476,31 @@
private long ramfsMergeOutputSize;
/**
- * Maximum no. of fetch-retries per-map.
+ * the max of all the map completion times
+ */
+ private int maxMapRuntime;
+
+ /**
+ * Maximum number of fetch-retries per-map.
*/
private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
/**
+ * Maximum percent of failed fetch attempt before killing the reduce task.
+ */
+ private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+
+ /**
+ * Minimum percent of progress required to keep the reduce alive.
+ */
+ private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+
+ /**
+ * Maximum percent of shuffle execution time required to keep the reducer alive.
+ */
+ private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+ /**
* Maximum no. of unique maps from which we failed to fetch map-outputs
* even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
* reduce task is failed.
@@ -862,12 +882,14 @@
(this.reduceTask.getPartition()%10)
);
this.random = new Random(randomSeed);
+ this.maxMapRuntime = 0;
}
public boolean fetchOutputs() throws IOException {
final int numOutputs = reduceTask.getNumMaps();
List<MapOutputLocation> knownOutputs =
new ArrayList<MapOutputLocation>(numCopiers);
+ int totalFailures = 0;
int numInFlight = 0, numCopied = 0;
int lowThreshold = numCopiers*2;
long bytesTransferred = 0;
@@ -896,6 +918,7 @@
// start the clock for bandwidth measurement
long startTime = System.currentTimeMillis();
long currentTime = startTime;
+ long lastProgressTime = System.currentTimeMillis();
IntWritable fromEventId = new IntWritable(0);
try {
@@ -1005,6 +1028,7 @@
if (cr != null) {
if (cr.getSuccess()) { // a successful copy
numCopied++;
+ lastProgressTime = System.currentTimeMillis();
bytesTransferred += cr.getSize();
long secsSinceStart =
@@ -1033,6 +1057,7 @@
String mapTaskId = cr.getLocation().getMapTaskId();
Integer mapId = cr.getLocation().getMapId();
+ totalFailures++;
Integer noFailedFetches =
mapTaskToFailedFetchesMap.get(mapTaskId);
noFailedFetches =
@@ -1056,8 +1081,43 @@
fetchFailedMaps.add(mapId);
// did we have too many unique failed-fetch maps?
- if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
- LOG.fatal("Shuffle failed with too many fetch failures! " +
+ // and did we fail on too many fetch attempts?
+ // and did we progress enough
+ // or did we wait for too long without any progress?
+
+ // check if the reducer is healthy
+ boolean reducerHealthy =
+ (((float)totalFailures / (totalFailures + numCopied))
+ < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+
+ // check if the reducer has progressed enough
+ boolean reducerProgressedEnough =
+ (((float)numCopied / numMaps)
+ >= MIN_REQUIRED_PROGRESS_PERCENT);
+
+ // check if the reducer is stalled for a long time
+
+ // duration for which the reducer is stalled
+ int stallDuration =
+ (int)(System.currentTimeMillis() - lastProgressTime);
+ // duration for which the reducer ran with progress
+ int shuffleProgressDuration =
+ (int)(lastProgressTime - startTime);
+ // min time the reducer should run without getting killed
+ int minShuffleRunDuration =
+ (shuffleProgressDuration > maxMapRuntime)
+ ? shuffleProgressDuration
+ : maxMapRuntime;
+ boolean reducerStalled =
+ (((float)stallDuration / minShuffleRunDuration)
+ >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+ // kill if not healthy and has insufficient progress
+ if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+ && !reducerHealthy
+ && (!reducerProgressedEnough || reducerStalled)) {
+ LOG.fatal("Shuffle failed with too many fetch failures " +
+ "and insufficient progress!" +
"Killing task " + getTaskId() + ".");
umbilical.shuffleError(getTaskId(),
"Exceeded MAX_FAILED_UNIQUE_FETCHES;"
@@ -1251,6 +1311,13 @@
int port = u.getPort();
String taskId = event.getTaskId();
int mId = event.idWithinJob();
+ int duration = event.getTaskRunTime();
+ if (duration > maxMapRuntime) {
+ maxMapRuntime = duration;
+ // adjust max-fetch-retries based on max-map-run-time
+ maxFetchRetriesPerMap =
+ getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
+ }
knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
}
break;
Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Fri Dec 21 10:57:55 2007
@@ -33,6 +33,7 @@
private int eventId;
private String taskTrackerHttp;
+ private int taskRunTime; // using int since runtime is the time difference
private String taskId;
Status status;
boolean isMap = false;
@@ -95,6 +96,22 @@
public String getTaskTrackerHttp() {
return taskTrackerHttp;
}
+
+ /**
+ * Returns time (in millisec) the task took to complete.
+ */
+ public int getTaskRunTime() {
+ return taskRunTime;
+ }
+
+ /**
+ * Set the task completion time
+ * @param taskCompletionTime time (in millisec) the task took to complete
+ */
+ public void setTaskRunTime(int taskCompletionTime) {
+ this.taskRunTime = taskCompletionTime;
+ }
+
/**
* set event Id. should be assigned incrementally starting from 0.
* @param eventId
@@ -153,6 +170,7 @@
out.writeBoolean(isMap);
WritableUtils.writeEnum(out, status);
WritableUtils.writeString(out, taskTrackerHttp);
+ WritableUtils.writeVInt(out, taskRunTime);
}
public void readFields(DataInput in) throws IOException {
@@ -161,5 +179,6 @@
this.isMap = in.readBoolean();
this.status = WritableUtils.readEnum(in, Status.class);
this.taskTrackerHttp = WritableUtils.readString(in);
+ this.taskRunTime = WritableUtils.readVInt(in);
}
}