You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/05/20 11:32:41 UTC
svn commit: r946580 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/java/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapreduce/task/reduce/
Author: cdouglas
Date: Thu May 20 09:32:41 2010
New Revision: 946580
URL: http://svn.apache.org/viewvc?rev=946580&view=rev
Log:
MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup and
failure attribution. Contributed by Amareshwari Sriramadasu
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Thu May 20 09:32:41 2010
@@ -1559,3 +1559,6 @@ Release 0.21.0 - Unreleased
MAPREDUCE-1611. Refresh nodes and refresh queues doesnt work with service
authorization enabled. (Amar Kamat via vinodkv)
+
+ MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup
+ and failure attribution. (Amareshwari Sriramadasu via cdouglas)
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May 20 09:32:41 2010
@@ -3643,8 +3643,12 @@ public class TaskTracker
len =
mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
}
-
- mapOutputIn.close();
+ try {
+ outStream.flush();
+ } catch (IOException ie) {
+ isInputException = false;
+ throw ie;
+ }
} catch (IOException ie) {
String errorMsg = "error on sending map " + mapId + " to reduce " +
reduce;
@@ -3652,6 +3656,8 @@ public class TaskTracker
tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg +
StringUtils.stringifyException(ie));
}
+ throw new IOException(errorMsg, ie);
+ } finally {
if (mapOutputIn != null) {
try {
mapOutputIn.close();
@@ -3659,7 +3665,6 @@ public class TaskTracker
LOG.info("problem closing map output file", ioe);
}
}
- throw new IOException(errorMsg, ie);
}
LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce +
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Thu May 20 09:32:41 2010
@@ -142,7 +142,6 @@ class EventFetcher<K,V> extends Thread {
break;
case FAILED:
case KILLED:
- break;
case OBSOLETE:
scheduler.obsoleteMapOutput(event.getTaskAttemptId());
LOG.info("Ignoring obsolete output of " + event.getTaskStatus() +
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu May 20 09:32:41 2010
@@ -200,10 +200,11 @@ class Fetcher<K,V> extends Thread {
// put url hash into http header
connection.addRequestProperty(
SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+ // set the read timeout
+ connection.setReadTimeout(readTimeout);
+ connect(connection, connectionTimeout);
connectSucceeded = true;
- input =
- new DataInputStream(getInputStream(connection, connectionTimeout,
- readTimeout));
+ input = new DataInputStream(connection.getInputStream());
// get the replyHash which is HMac of the encHash we sent to the server
String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
@@ -250,14 +251,7 @@ class Fetcher<K,V> extends Thread {
good = copyMapOutput(host, input, remaining);
}
- // Drain the buffer, just in case
- final int DRAIN_BUF_SIZE = 4096;
- byte[] drainBuf = new byte[DRAIN_BUF_SIZE];
- int retVal = 0;
- while ( retVal != -1) {
- retVal = input.read(drainBuf, 0, DRAIN_BUF_SIZE);
- }
- input.close();
+ IOUtils.cleanup(LOG, input);
// Sanity check
if (good && !remaining.isEmpty()) {
@@ -425,9 +419,7 @@ class Fetcher<K,V> extends Thread {
* only on the last failure. Instead of connecting with a timeout of
* X, we try connecting with a timeout of x < X but multiple times.
*/
- private InputStream getInputStream(URLConnection connection,
- int connectionTimeout,
- int readTimeout)
+ private void connect(URLConnection connection, int connectionTimeout)
throws IOException {
int unit = 0;
if (connectionTimeout < 0) {
@@ -436,13 +428,12 @@ class Fetcher<K,V> extends Thread {
} else if (connectionTimeout > 0) {
unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
}
- // set the read timeout to the total timeout
- connection.setReadTimeout(readTimeout);
// set the connect timeout to the unit-connect-timeout
connection.setConnectTimeout(unit);
while (true) {
try {
- return connection.getInputStream();
+ connection.connect();
+ break;
} catch (IOException ioe) {
// update the total remaining connect-timeout
connectionTimeout -= unit;