You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/06/06 08:26:58 UTC
svn commit: r663837 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/ReduceTask.java
Author: ddas
Date: Thu Jun 5 23:26:58 2008
New Revision: 663837
URL: http://svn.apache.org/viewvc?rev=663837&view=rev
Log:
HADOOP-3427. Improves the shuffle scheduler. It now waits for notifications from shuffle threads when it has scheduled enough, before scheduling more. Contibuted by Devaraj Das.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=663837&r1=663836&r2=663837&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Jun 5 23:26:58 2008
@@ -238,6 +238,9 @@
HADOOP-236. JobTacker now refuses connection from a task tracker with a
different version number. (Sharad Agarwal via ddas)
+ HADOOP-3427. Improves the shuffle scheduler. It now waits for notifications
+ from shuffle threads when it has scheduled enough, before scheduling more.
+ (ddas)
OPTIMIZATIONS
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=663837&r1=663836&r2=663837&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu Jun 5 23:26:58 2008
@@ -369,6 +369,12 @@
private int numCopiers;
/**
+ * a number that is set to the max #fetches we'd schedule and then
+ * pause the schduling
+ */
+ private int maxInFlight;
+
+ /**
* the amount of time spent on fetching one map output before considering
* it as failed and notifying the jobtracker about it.
*/
@@ -1114,6 +1120,7 @@
this.scheduledCopies = new ArrayList<MapOutputLocation>(100);
this.copyResults = new ArrayList<CopyResult>(100);
this.numCopiers = conf.getInt("mapred.reduce.parallel.copies", 5);
+ this.maxInFlight = 4 * numCopiers;
this.maxBackoff = conf.getInt("mapred.reduce.copy.backoff", 300);
this.combinerClass = conf.getCombinerClass();
combineCollector = (null != combinerClass)
@@ -1160,6 +1167,10 @@
this.maxMapRuntime = 0;
}
+ private boolean busyEnough(int numInFlight) {
+ return numInFlight > maxInFlight;
+ }
+
@SuppressWarnings("unchecked")
public boolean fetchOutputs() throws IOException {
final int numOutputs = reduceTask.getNumMaps();
@@ -1335,151 +1346,151 @@
while (numInFlight > 0 && mergeThrowable == null) {
LOG.debug(reduceTask.getTaskID() + " numInFlight = " +
numInFlight);
- CopyResult cr = getCopyResult();
+ //the call to getCopyResult will either
+ //1) return immediately with a null or a valid CopyResult object,
+ // or
+ //2) if the numInFlight is above maxInFlight, return with a
+ // CopyResult object after getting a notification from a
+ // fetcher thread,
+ //So, when getCopyResult returns null, we can be sure that
+ //we aren't busy enough and we should go and get more mapcompletion
+ //events from the tasktracker
+ CopyResult cr = getCopyResult(numInFlight);
+
+ if (cr == null) {
+ break;
+ }
- if (cr != null) {
- if (cr.getSuccess()) { // a successful copy
- numCopied++;
- lastProgressTime = System.currentTimeMillis();
- bytesTransferred += cr.getSize();
+ if (cr.getSuccess()) { // a successful copy
+ numCopied++;
+ lastProgressTime = System.currentTimeMillis();
+ bytesTransferred += cr.getSize();
- long secsSinceStart =
- (System.currentTimeMillis()-startTime)/1000+1;
- float mbs = ((float)bytesTransferred)/(1024*1024);
- float transferRate = mbs/secsSinceStart;
+ long secsSinceStart =
+ (System.currentTimeMillis()-startTime)/1000+1;
+ float mbs = ((float)bytesTransferred)/(1024*1024);
+ float transferRate = mbs/secsSinceStart;
- copyPhase.startNextPhase();
- copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
- + " at " +
- mbpsFormat.format(transferRate) + " MB/s)");
+ copyPhase.startNextPhase();
+ copyPhase.setStatus("copy (" + numCopied + " of " + numOutputs
+ + " at " +
+ mbpsFormat.format(transferRate) + " MB/s)");
- // Note successfull fetch for this mapId to invalidate
- // (possibly) old fetch-failures
- fetchFailedMaps.remove(cr.getLocation().getTaskId());
- } else if (cr.isObsolete()) {
- //ignore
- LOG.info(reduceTask.getTaskID() +
- " Ignoring obsolete copy result for Map Task: " +
- cr.getLocation().getTaskAttemptId() + " from host: " +
- cr.getHost());
- } else {
- retryFetches.add(cr.getLocation());
-
- // note the failed-fetch
- TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
- TaskID mapId = cr.getLocation().getTaskId();
-
- totalFailures++;
- Integer noFailedFetches =
- mapTaskToFailedFetchesMap.get(mapTaskId);
- noFailedFetches =
- (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
- mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
- LOG.info("Task " + getTaskID() + ": Failed fetch #" +
- noFailedFetches + " from " + mapTaskId);
-
- // did the fetch fail too many times?
- // using a hybrid technique for notifying the jobtracker.
- // a. the first notification is sent after max-retries
- // b. subsequent notifications are sent after 2 retries.
- if ((noFailedFetches >= maxFetchRetriesPerMap)
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
- synchronized (ReduceTask.this) {
- taskStatus.addFetchFailedMap(mapTaskId);
- LOG.info("Failed to fetch map-output from " + mapTaskId +
- " even after MAX_FETCH_RETRIES_PER_MAP retries... "
- + " reporting to the JobTracker");
- }
+ // Note successfull fetch for this mapId to invalidate
+ // (possibly) old fetch-failures
+ fetchFailedMaps.remove(cr.getLocation().getTaskId());
+ } else if (cr.isObsolete()) {
+ //ignore
+ LOG.info(reduceTask.getTaskID() +
+ " Ignoring obsolete copy result for Map Task: " +
+ cr.getLocation().getTaskAttemptId() + " from host: " +
+ cr.getHost());
+ } else {
+ retryFetches.add(cr.getLocation());
+
+ // note the failed-fetch
+ TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
+ TaskID mapId = cr.getLocation().getTaskId();
+
+ totalFailures++;
+ Integer noFailedFetches =
+ mapTaskToFailedFetchesMap.get(mapTaskId);
+ noFailedFetches =
+ (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
+ mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
+ LOG.info("Task " + getTaskID() + ": Failed fetch #" +
+ noFailedFetches + " from " + mapTaskId);
+
+ // did the fetch fail too many times?
+ // using a hybrid technique for notifying the jobtracker.
+ // a. the first notification is sent after max-retries
+ // b. subsequent notifications are sent after 2 retries.
+ if ((noFailedFetches >= maxFetchRetriesPerMap)
+ && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+ synchronized (ReduceTask.this) {
+ taskStatus.addFetchFailedMap(mapTaskId);
+ LOG.info("Failed to fetch map-output from " + mapTaskId +
+ " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ + " reporting to the JobTracker");
}
-
- // note unique failed-fetch maps
- if (noFailedFetches == maxFetchRetriesPerMap) {
- fetchFailedMaps.add(mapId);
-
- // did we have too many unique failed-fetch maps?
- // and did we fail on too many fetch attempts?
- // and did we progress enough
- // or did we wait for too long without any progress?
-
- // check if the reducer is healthy
- boolean reducerHealthy =
- (((float)totalFailures / (totalFailures + numCopied))
- < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
-
- // check if the reducer has progressed enough
- boolean reducerProgressedEnough =
- (((float)numCopied / numMaps)
- >= MIN_REQUIRED_PROGRESS_PERCENT);
-
- // check if the reducer is stalled for a long time
-
- // duration for which the reducer is stalled
- int stallDuration =
- (int)(System.currentTimeMillis() - lastProgressTime);
- // duration for which the reducer ran with progress
- int shuffleProgressDuration =
- (int)(lastProgressTime - startTime);
- // min time the reducer should run without getting killed
- int minShuffleRunDuration =
- (shuffleProgressDuration > maxMapRuntime)
- ? shuffleProgressDuration
- : maxMapRuntime;
- boolean reducerStalled =
- (((float)stallDuration / minShuffleRunDuration)
- >= MAX_ALLOWED_STALL_TIME_PERCENT);
+ }
+ // note unique failed-fetch maps
+ if (noFailedFetches == maxFetchRetriesPerMap) {
+ fetchFailedMaps.add(mapId);
- // kill if not healthy and has insufficient progress
- if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
- && !reducerHealthy
- && (!reducerProgressedEnough || reducerStalled)) {
- LOG.fatal("Shuffle failed with too many fetch failures " +
- "and insufficient progress!" +
- "Killing task " + getTaskID() + ".");
- umbilical.shuffleError(getTaskID(),
- "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
- + " bailing-out.");
- }
- }
+ // did we have too many unique failed-fetch maps?
+ // and did we fail on too many fetch attempts?
+ // and did we progress enough
+ // or did we wait for too long without any progress?
+
+ // check if the reducer is healthy
+ boolean reducerHealthy =
+ (((float)totalFailures / (totalFailures + numCopied))
+ < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
- // back off exponentially until num_retries <= max_retries
- // back off by max_backoff/2 on subsequent failed attempts
- currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
- ? BACKOFF_INIT
- * (1 << (noFailedFetches - 1))
- : (this.maxBackoff * 1000 / 2);
- penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
- LOG.warn(reduceTask.getTaskID() + " adding host " +
- cr.getHost() + " to penalty box, next contact in " +
- (currentBackOff/1000) + " seconds");
+ // check if the reducer has progressed enough
+ boolean reducerProgressedEnough =
+ (((float)numCopied / numMaps)
+ >= MIN_REQUIRED_PROGRESS_PERCENT);
- // other outputs from the failed host may be present in the
- // knownOutputs cache, purge them. This is important in case
- // the failure is due to a lost tasktracker (causes many
- // unnecessary backoffs). If not, we only take a small hit
- // polling the tasktracker a few more times
- Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
- while (locIt.hasNext()) {
- MapOutputLocation loc = locIt.next();
- if (cr.getHost().equals(loc.getHost())) {
- retryFetches.add(loc);
- locIt.remove();
- }
+ // check if the reducer is stalled for a long time
+ // duration for which the reducer is stalled
+ int stallDuration =
+ (int)(System.currentTimeMillis() - lastProgressTime);
+ // duration for which the reducer ran with progress
+ int shuffleProgressDuration =
+ (int)(lastProgressTime - startTime);
+ // min time the reducer should run without getting killed
+ int minShuffleRunDuration =
+ (shuffleProgressDuration > maxMapRuntime)
+ ? shuffleProgressDuration
+ : maxMapRuntime;
+ boolean reducerStalled =
+ (((float)stallDuration / minShuffleRunDuration)
+ >= MAX_ALLOWED_STALL_TIME_PERCENT);
+
+ // kill if not healthy and has insufficient progress
+ if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+ && !reducerHealthy
+ && (!reducerProgressedEnough || reducerStalled)) {
+ LOG.fatal("Shuffle failed with too many fetch failures " +
+ "and insufficient progress!" +
+ "Killing task " + getTaskID() + ".");
+ umbilical.shuffleError(getTaskID(),
+ "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
+ + " bailing-out.");
}
}
- uniqueHosts.remove(cr.getHost());
- numInFlight--;
- }
-
- //Check whether we have more CopyResult to check. If there is none,
- //break
- synchronized (copyResults) {
- if (copyResults.size() == 0) {
- break;
+
+ // back off exponentially until num_retries <= max_retries
+ // back off by max_backoff/2 on subsequent failed attempts
+ currentTime = System.currentTimeMillis();
+ int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
+ ? BACKOFF_INIT
+ * (1 << (noFailedFetches - 1))
+ : (this.maxBackoff * 1000 / 2);
+ penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+ LOG.warn(reduceTask.getTaskID() + " adding host " +
+ cr.getHost() + " to penalty box, next contact in " +
+ (currentBackOff/1000) + " seconds");
+
+ // other outputs from the failed host may be present in the
+ // knownOutputs cache, purge them. This is important in case
+ // the failure is due to a lost tasktracker (causes many
+ // unnecessary backoffs). If not, we only take a small hit
+ // polling the tasktracker a few more times
+ Iterator<MapOutputLocation> locIt = knownOutputs.iterator();
+ while (locIt.hasNext()) {
+ MapOutputLocation loc = locIt.next();
+ if (cr.getHost().equals(loc.getHost())) {
+ retryFetches.add(loc);
+ locIt.remove();
+ }
}
}
+ uniqueHosts.remove(cr.getHost());
+ numInFlight--;
}
-
}
// all done, inform the copiers to exit
@@ -1542,18 +1553,20 @@
return inMemorySegments;
}
- private CopyResult getCopyResult() {
+ private CopyResult getCopyResult(int numInFlight) {
synchronized (copyResults) {
- if (copyResults.isEmpty()) {
+ while (copyResults.isEmpty()) {
try {
- copyResults.wait(2000); // wait for 2 sec
+ //The idea is that if we have scheduled enough, we can wait until
+ //we hear from one of the copiers.
+ if (busyEnough(numInFlight)) {
+ copyResults.wait();
+ } else {
+ return null;
+ }
} catch (InterruptedException e) { }
}
- if (copyResults.isEmpty()) {
- return null;
- } else {
- return copyResults.remove(0);
- }
+ return copyResults.remove(0);
}
}