You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tg...@apache.org on 2012/07/27 03:48:54 UTC
svn commit: r1366260 - in
/hadoop/common/branches/branch-2/hadoop-mapreduce-project: ./
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/
hadoop-mapreduce-client/hadoop-mapreduce-client-core/src...
Author: tgraves
Date: Fri Jul 27 01:48:54 2012
New Revision: 1366260
URL: http://svn.apache.org/viewvc?rev=1366260&view=rev
Log:
merge -r 1366257:1366258 from trunk. FIXES: MAPREDUCE-4423
Added:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
- copied unchanged from r1366258, hadoop/common/trunk/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/task/reduce/TestFetcher.java
Modified:
hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt?rev=1366260&r1=1366259&r2=1366260&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/CHANGES.txt Fri Jul 27 01:48:54 2012
@@ -633,6 +633,9 @@ Release 0.23.3 - UNRELEASED
MAPREDUCE-4467. IndexCache failures due to missing synchronization
(Kihwal Lee via tgraves)
+ MAPREDUCE-4423. Potential infinite fetching of map output (Robert Evans
+ via tgraves)
+
Release 0.23.2 - UNRELEASED
INCOMPATIBLE CHANGES
Modified: hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=1366260&r1=1366259&r2=1366260&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/common/branches/branch-2/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Fri Jul 27 01:48:54 2012
@@ -21,11 +21,12 @@ import java.io.DataInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.HttpURLConnection;
import java.net.MalformedURLException;
import java.net.URL;
-import java.net.HttpURLConnection;
import java.net.URLConnection;
import java.security.GeneralSecurityException;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
@@ -53,7 +54,8 @@ import org.apache.hadoop.mapreduce.task.
import org.apache.hadoop.util.Progressable;
import org.apache.hadoop.util.ReflectionUtils;
-@SuppressWarnings({"deprecation"})
+import com.google.common.annotations.VisibleForTesting;
+
class Fetcher<K,V> extends Thread {
private static final Log LOG = LogFactory.getLog(Fetcher.class);
@@ -199,6 +201,7 @@ class Fetcher<K,V> extends Thread {
}
}
+ @VisibleForTesting
protected HttpURLConnection openConnection(URL url) throws IOException {
HttpURLConnection conn = (HttpURLConnection) url.openConnection();
if (sslShuffle) {
@@ -209,17 +212,18 @@ class Fetcher<K,V> extends Thread {
throw new IOException(ex);
}
httpsConn.setHostnameVerifier(sslFactory.getHostnameVerifier());
- }
- return conn;
}
-
+ return conn;
+ }
+
/**
* The crux of the matter...
*
* @param host {@link MapHost} from which we need to
* shuffle available map-outputs.
*/
- private void copyFromHost(MapHost host) throws IOException {
+ @VisibleForTesting
+ protected void copyFromHost(MapHost host) throws IOException {
// Get completed maps on 'host'
List<TaskAttemptID> maps = scheduler.getMapsForHost(host);
@@ -229,9 +233,9 @@ class Fetcher<K,V> extends Thread {
return;
}
- LOG.debug("Fetcher " + id + " going to fetch from " + host);
- for (TaskAttemptID tmp: maps) {
- LOG.debug(tmp);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("Fetcher " + id + " going to fetch from " + host + " for: "
+ + maps);
}
// List of maps to be fetched yet
@@ -304,17 +308,25 @@ class Fetcher<K,V> extends Thread {
try {
// Loop through available map-outputs and fetch them
- // On any error, good becomes false and we exit after putting back
- // the remaining maps to the yet_to_be_fetched list
- boolean good = true;
- while (!remaining.isEmpty() && good) {
- good = copyMapOutput(host, input, remaining);
+ // On any error, faildTasks is not null and we exit
+ // after putting back the remaining maps to the
+ // yet_to_be_fetched list and marking the failed tasks.
+ TaskAttemptID[] failedTasks = null;
+ while (!remaining.isEmpty() && failedTasks == null) {
+ failedTasks = copyMapOutput(host, input, remaining);
+ }
+
+ if(failedTasks != null && failedTasks.length > 0) {
+ LOG.warn("copyMapOutput failed for tasks "+Arrays.toString(failedTasks));
+ for(TaskAttemptID left: failedTasks) {
+ scheduler.copyFailed(left, host, true);
+ }
}
IOUtils.cleanup(LOG, input);
// Sanity check
- if (good && !remaining.isEmpty()) {
+ if (failedTasks == null && !remaining.isEmpty()) {
throw new IOException("server didn't return all expected map outputs: "
+ remaining.size() + " left.");
}
@@ -323,10 +335,11 @@ class Fetcher<K,V> extends Thread {
scheduler.putBackKnownMapOutput(host, left);
}
}
-
- }
+ }
- private boolean copyMapOutput(MapHost host,
+ private static TaskAttemptID[] EMPTY_ATTEMPT_ID_ARRAY = new TaskAttemptID[0];
+
+ private TaskAttemptID[] copyMapOutput(MapHost host,
DataInputStream input,
Set<TaskAttemptID> remaining) {
MapOutput<K,V> mapOutput = null;
@@ -348,18 +361,21 @@ class Fetcher<K,V> extends Thread {
} catch (IllegalArgumentException e) {
badIdErrs.increment(1);
LOG.warn("Invalid map id ", e);
- return false;
+ //Don't know which one was bad, so consider all of them as bad
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
}
// Do some basic sanity verification
if (!verifySanity(compressedLength, decompressedLength, forReduce,
remaining, mapId)) {
- return false;
+ return new TaskAttemptID[] {mapId};
}
- LOG.debug("header: " + mapId + ", len: " + compressedLength +
- ", decomp len: " + decompressedLength);
+ if(LOG.isDebugEnabled()) {
+ LOG.debug("header: " + mapId + ", len: " + compressedLength +
+ ", decomp len: " + decompressedLength);
+ }
// Get the location for the map output - either in-memory or on-disk
mapOutput = merger.reserve(mapId, decompressedLength, id);
@@ -367,7 +383,8 @@ class Fetcher<K,V> extends Thread {
// Check if we can shuffle *now* ...
if (mapOutput.getType() == Type.WAIT) {
LOG.info("fetcher#" + id + " - MergerManager returned Status.WAIT ...");
- return false;
+ //Not an error but wait to process data.
+ return EMPTY_ATTEMPT_ID_ARRAY;
}
// Go!
@@ -389,24 +406,27 @@ class Fetcher<K,V> extends Thread {
// Note successful shuffle
remaining.remove(mapId);
metrics.successFetch();
- return true;
+ return null;
} catch (IOException ioe) {
ioErrs.increment(1);
if (mapId == null || mapOutput == null) {
LOG.info("fetcher#" + id + " failed to read map header" +
mapId + " decomp: " +
decompressedLength + ", " + compressedLength, ioe);
- return false;
+ if(mapId == null) {
+ return remaining.toArray(new TaskAttemptID[remaining.size()]);
+ } else {
+ return new TaskAttemptID[] {mapId};
+ }
}
- LOG.info("Failed to shuffle output of " + mapId +
+ LOG.warn("Failed to shuffle output of " + mapId +
" from " + host.getHostName(), ioe);
// Inform the shuffle-scheduler
mapOutput.abort();
- scheduler.copyFailed(mapId, host, true);
metrics.failedFetch();
- return false;
+ return new TaskAttemptID[] {mapId};
}
}