You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 02:08:16 UTC
svn commit: r1076912 -
/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Author: omalley
Date: Fri Mar 4 01:08:16 2011
New Revision: 1076912
URL: http://svn.apache.org/viewvc?rev=1076912&view=rev
Log:
commit dc1e47c079f73b87f87e123be15f5988e27ad16b
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:13 2009 -0700
Applying patch 2358082.3327.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076912&r1=1076911&r2=1076912&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar 4 01:08:16 2011
@@ -564,6 +564,12 @@ class ReduceTask extends Task {
output.close(reducerContext);
}
+ private static enum CopyOutputErrorType {
+ NO_ERROR,
+ READ_ERROR,
+ OTHER_ERROR
+ };
+
class ReduceCopier<K, V> implements MRConstants {
/** Reference to the umbilical object */
@@ -745,6 +751,11 @@ class ReduceTask extends Task {
private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
/**
+ * The minimum percentage of maps yet to be copied,
+ * which indicates end of shuffle
+ */
+ private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
+ /**
* Maximum no. of unique maps from which we failed to fetch map-outputs
* even after {@link #maxFetchRetriesPerMap} retries; after this the
* reduce task is failed.
@@ -857,11 +868,18 @@ class ReduceTask extends Task {
//a flag signifying whether a copy result is obsolete
private static final int OBSOLETE = -2;
+ private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
CopyResult(MapOutputLocation loc, long size) {
this.loc = loc;
this.size = size;
}
-
+
+ CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
+ this.loc = loc;
+ this.size = size;
+ this.error = error;
+ }
+
public boolean getSuccess() { return size >= 0; }
public boolean isObsolete() {
return size == OBSOLETE;
@@ -869,6 +887,7 @@ class ReduceTask extends Task {
public long getSize() { return size; }
public String getHost() { return loc.getHost(); }
public MapOutputLocation getLocation() { return loc; }
+ public CopyOutputErrorType getError() { return error; }
}
private int nextMapOutputCopierId = 0;
@@ -1119,6 +1138,7 @@ class ReduceTask extends Task {
private MapOutputLocation currentLocation = null;
private int id = nextMapOutputCopierId++;
private Reporter reporter;
+ private boolean readError = false;
// Decompression of map-outputs
private CompressionCodec codec = null;
@@ -1143,7 +1163,7 @@ class ReduceTask extends Task {
*/
public synchronized boolean fail() {
if (currentLocation != null) {
- finish(-1);
+ finish(-1, CopyOutputErrorType.OTHER_ERROR);
return true;
} else {
return false;
@@ -1161,11 +1181,11 @@ class ReduceTask extends Task {
currentLocation = loc;
}
- private synchronized void finish(long size) {
+ private synchronized void finish(long size, CopyOutputErrorType error) {
if (currentLocation != null) {
LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
synchronized (copyResults) {
- copyResults.add(new CopyResult(currentLocation, size));
+ copyResults.add(new CopyResult(currentLocation, size, error));
copyResults.notify();
}
currentLocation = null;
@@ -1188,23 +1208,27 @@ class ReduceTask extends Task {
}
loc = scheduledCopies.remove(0);
}
-
+ CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
+ readError = false;
try {
shuffleClientMetrics.threadBusy();
start(loc);
size = copyOutput(loc);
shuffleClientMetrics.successFetch();
+ error = CopyOutputErrorType.NO_ERROR;
} catch (IOException e) {
LOG.warn(reduceTask.getTaskID() + " copy failed: " +
loc.getTaskAttemptId() + " from " + loc.getHost());
LOG.warn(StringUtils.stringifyException(e));
shuffleClientMetrics.failedFetch();
-
+ if (readError) {
+ error = CopyOutputErrorType.READ_ERROR;
+ }
// Reset
size = -1;
} finally {
shuffleClientMetrics.threadFree();
- finish(size);
+ finish(size, error);
}
} catch (InterruptedException e) {
break; // ALL DONE
@@ -1444,7 +1468,8 @@ class ReduceTask extends Task {
connection.setConnectTimeout(unit);
while (true) {
try {
- return connection.getInputStream();
+ connection.connect();
+ break;
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;
@@ -1463,6 +1488,12 @@ class ReduceTask extends Task {
}
}
}
+ try {
+ return connection.getInputStream();
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
}
private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
@@ -1548,6 +1579,7 @@ class ReduceTask extends Task {
IOUtils.cleanup(LOG, input);
// Re-throw
+ readError = true;
throw ioe;
}
@@ -1613,7 +1645,13 @@ class ReduceTask extends Task {
output = rfs.create(localFilename);
byte[] buf = new byte[64 * 1024];
- int n = input.read(buf, 0, buf.length);
+ int n = -1;
+ try {
+ n = input.read(buf, 0, buf.length);
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
while (n > 0) {
bytesRead += n;
shuffleClientMetrics.inputBytes(n);
@@ -1621,7 +1659,12 @@ class ReduceTask extends Task {
// indicate we're making progress
reporter.progress();
- n = input.read(buf, 0, buf.length);
+ try {
+ n = input.read(buf, 0, buf.length);
+ } catch (IOException ioe) {
+ readError = true;
+ throw ioe;
+ }
}
LOG.info("Read " + bytesRead + " bytes from map-output for " +
@@ -2037,17 +2080,38 @@ class ReduceTask extends Task {
mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
LOG.info("Task " + getTaskID() + ": Failed fetch #" +
noFailedFetches + " from " + mapTaskId);
+
+ // half the number of max fetch retries per map during
+ // the end of shuffle
+ int fetchRetriesPerMap = maxFetchRetriesPerMap;
+ int pendingCopies = numMaps - numCopied;
+
+ // The check noFailedFetches != maxFetchRetriesPerMap is
+ // required to make sure of the notification in case of a
+ // corner case :
+ // when noFailedFetches reached maxFetchRetriesPerMap and
+ // reducer reached the end of shuffle, then we may miss sending
+ // a notification if the difference between
+ // noFailedFetches and fetchRetriesPerMap is not divisible by 2
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
+ noFailedFetches != maxFetchRetriesPerMap) {
+ fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+ }
// did the fetch fail too many times?
// using a hybrid technique for notifying the jobtracker.
// a. the first notification is sent after max-retries
// b. subsequent notifications are sent after 2 retries.
- if ((noFailedFetches >= maxFetchRetriesPerMap)
- && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+ // c. send notification immediately if it is a read error.
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
+ ((noFailedFetches >= fetchRetriesPerMap)
+ && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
synchronized (ReduceTask.this) {
taskStatus.addFetchFailedMap(mapTaskId);
+ reporter.progress();
LOG.info("Failed to fetch map-output from " + mapTaskId +
" even after MAX_FETCH_RETRIES_PER_MAP retries... "
+ + " or it is a read error, "
+ " reporting to the JobTracker");
}
}
@@ -2103,10 +2167,22 @@ class ReduceTask extends Task {
// back off exponentially until num_retries <= max_retries
// back off by max_backoff/2 on subsequent failed attempts
currentTime = System.currentTimeMillis();
- int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap
+ int currentBackOff = noFailedFetches <= fetchRetriesPerMap
? BACKOFF_INIT
* (1 << (noFailedFetches - 1))
: (this.maxBackoff * 1000 / 2);
+ // If it is read error,
+ // back off for maxMapRuntime/2
+ // during end of shuffle,
+ // backoff for min(maxMapRuntime/2, currentBackOff)
+ if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
+ int backOff = maxMapRuntime >> 1;
+ if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
+ backOff = Math.min(backOff, currentBackOff);
+ }
+ currentBackOff = backOff;
+ }
+
penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
LOG.warn(reduceTask.getTaskID() + " adding host " +
cr.getHost() + " to penalty box, next contact in " +