You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/07/20 22:21:10 UTC
svn commit: r1363936 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src...
Author: tgraves
Date: Fri Jul 20 20:21:09 2012
New Revision: 1363936
URL: http://svn.apache.org/viewvc?rev=1363936&view=rev
Log:
svn merge --change -1363455 for reverting MAPREDUCE-4423
Removed:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1363936&r1=1363935&r2=1363936&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Jul 20 20:21:09 2012
@@ -605,9 +605,6 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4448. Fix NM crash during app cleanup if aggregation didn't
init. (Jason Lowe via daryn)
- MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans
- via tgraves)
-
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1363936&r1=1363935&r2=1363936&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 20 20:21:09 2012
@@ -49,8 +49,7 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-import com.google.common.annotations.VisibleForTesting;
-
+@SuppressWarnings({"deprecation"})
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -176,18 +175,13 @@ class Fetcher<K,V> extends Thread {
}
}
- @VisibleForTesting
- protected HttpURLConnection openConnection(URL url) throws IOException {
- return (HttpURLConnection)url.openConnection();
- }
-
/**
* The crux of the matter...
*
* @param host {@link MapHost} from which we need to
* shuffle available map-outputs.
*/
- protected void copyFromHost(MapHost host) throws IOException {
+ private void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
@@ -197,11 +191,9 @@ class Fetcher<K,V> extends Thread {
return;
}
- if(LOG.isDebugEnabled()) {
- LOG.debug("Fetcher " + id + " going to fetch from " + host);
- for (TaskAttemptID tmp: maps) {
- LOG.debug(tmp);
- }
+ LOG.debug("Fetcher " + id + " going to fetch from " + host);
+ for (TaskAttemptID tmp: maps) {
+ LOG.debug(tmp);
}
// List of maps to be fetched yet
@@ -213,7 +205,7 @@ class Fetcher<K,V> extends Thread {
try {
URL url = getMapOutputURL(host, maps);
- HttpURLConnection connection = openConnection(url);
+ HttpURLConnection connection = (HttpURLConnection)url.openConnection();
// generate hash of the url
String msgToEncode = SecureShuffleUtils.buildMsgFrom(url);
@@ -274,24 +266,17 @@ class Fetcher<K,V> extends Thread {
try {
// Loop through available map-outputs and fetch them
- // On any error, faildTasks is not null and we exit
- // after putting back the remaining maps to the
- // yet_to_be_fetched list and marking the failed tasks.
- TaskAttemptID[] failedTasks = null;
- while (!remaining.isEmpty() && failedTasks == null) {
- failedTasks = copyMapOutput(host, input, remaining);
- }
-
- if(failedTasks != null) {
- for(TaskAttemptID left: failedTasks) {
- scheduler.copyFailed(left, host, true);
- }
+ // On any error, good becomes false and we exit after putting back
+ // the remaining maps to the yet_to_be_fetched list
+ boolean good = true;
+ while (!remaining.isEmpty() && good) {
+ good = copyMapOutput(host, input, remaining);
}
IOUtils.cleanup(LOG, input);
// Sanity check
- if (failedTasks == null && !remaining.isEmpty()) {
+ if (good && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
@@ -300,9 +285,10 @@ class Fetcher<K,V> extends Thread {
scheduler.putBackKnownMapOutput(host, left);
}
}
- }
+
+ }
- private TaskAttemptID[] copyMapOutput(MapHost host,
+ private boolean copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining) {
MapOutput<K,V> mapOutput = null;
@@ -324,15 +310,14 @@ class Fetcher<K,V> extends Thread {
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
- //Don't know which one was bad, so consider all of them as bad
- return remaining.toArray(new TaskAttemptID[remaining.size()]);
+ return false;
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
- return new TaskAttemptID[] {mapId};
+ return false;
}
LOG.debug("header: " + mapId + ", len: " + compressedLength +
@@ -344,7 +329,7 @@ class Fetcher<K,V> extends Thread {
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
- return new TaskAttemptID[] {mapId};
+ return false;
}
// Go!
@@ -366,18 +351,14 @@ class Fetcher<K,V> extends Thread {
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
- return null;
+ return true;
} catch (IOException ioe) {
ioErrs.increment(1);
if (mapId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
mapId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
- if(mapId == null) {
- return remaining.toArray(new TaskAttemptID[remaining.size()]);
- } else {
- return new TaskAttemptID[] {mapId};
- }
+ return false;
}
LOG.info("Failed to shuffle output of " + mapId +
@@ -385,8 +366,9 @@ class Fetcher<K,V> extends Thread {
// Inform the shuffle-scheduler
mapOutput.abort();
+ scheduler.copyFailed(mapId, host, true);
metrics.failedFetch();
- return new TaskAttemptID[] {mapId};
+ return false;
}
}