You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by cd...@apache.org on 2010/05/20 11:32:41 UTC

svn commit: r946580 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/java/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapreduce/task/reduce/

Author: cdouglas
Date: Thu May 20 09:32:41 2010
New Revision: 946580

URL: http://svn.apache.org/viewvc?rev=946580&view=rev
Log:
MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup and
failure attribution. Contributed by Amareshwari Sriramadasu

Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Thu May 20 09:32:41 2010
@@ -1559,3 +1559,6 @@ Release 0.21.0 - Unreleased
 
     MAPREDUCE-1611. Refresh nodes and refresh queues doesnt work with service
     authorization enabled. (Amar Kamat via vinodkv)
+
+    MAPREDUCE-1276. Correct flaws in the shuffle related to connection setup
+    and failure attribution. (Amareshwari Sriramadasu via cdouglas)

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/TaskTracker.java Thu May 20 09:32:41 2010
@@ -3643,8 +3643,12 @@ public class TaskTracker 
           len =
             mapOutputIn.read(buffer, 0, (int)Math.min(rem, MAX_BYTES_TO_READ));
         }
-        
-        mapOutputIn.close();
+        try {
+          outStream.flush();
+        } catch (IOException ie) {
+          isInputException = false;
+          throw ie;
+        }
       } catch (IOException ie) {
         String errorMsg = "error on sending map " + mapId + " to reduce " + 
                           reduce;
@@ -3652,6 +3656,8 @@ public class TaskTracker 
           tracker.mapOutputLost(TaskAttemptID.forName(mapId), errorMsg + 
                                 StringUtils.stringifyException(ie));
         }
+        throw new IOException(errorMsg, ie);
+      } finally {
         if (mapOutputIn != null) {
           try {
             mapOutputIn.close();
@@ -3659,7 +3665,6 @@ public class TaskTracker 
             LOG.info("problem closing map output file", ioe);
           }
         }
-        throw new IOException(errorMsg, ie);
       }
       
       LOG.info("Sent out " + totalRead + " bytes to reduce " + reduce + 

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/EventFetcher.java Thu May 20 09:32:41 2010
@@ -142,7 +142,6 @@ class EventFetcher<K,V> extends Thread {
           break;
         case FAILED:
         case KILLED:
-          break;
         case OBSOLETE:
           scheduler.obsoleteMapOutput(event.getTaskAttemptId());
           LOG.info("Ignoring obsolete output of " + event.getTaskStatus() + 

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java?rev=946580&r1=946579&r2=946580&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapreduce/task/reduce/Fetcher.java Thu May 20 09:32:41 2010
@@ -200,10 +200,11 @@ class Fetcher<K,V> extends Thread {
       // put url hash into http header
       connection.addRequestProperty(
           SecureShuffleUtils.HTTP_HEADER_URL_HASH, encHash);
+      // set the read timeout
+      connection.setReadTimeout(readTimeout);
+      connect(connection, connectionTimeout);
       connectSucceeded = true;
-      input = 
-        new DataInputStream(getInputStream(connection, connectionTimeout,
-                                           readTimeout));
+      input = new DataInputStream(connection.getInputStream());
       
       // get the replyHash which is HMac of the encHash we sent to the server
       String replyHash = connection.getHeaderField(SecureShuffleUtils.HTTP_HEADER_REPLY_URL_HASH);
@@ -250,14 +251,7 @@ class Fetcher<K,V> extends Thread {
         good = copyMapOutput(host, input, remaining);
       }
       
-      // Drain the buffer, just in case
-      final int DRAIN_BUF_SIZE = 4096;
-      byte[] drainBuf = new byte[DRAIN_BUF_SIZE];
-      int retVal = 0;
-      while ( retVal != -1) {
-        retVal = input.read(drainBuf, 0, DRAIN_BUF_SIZE);
-      }
-      input.close();
+      IOUtils.cleanup(LOG, input);
       
       // Sanity check
       if (good && !remaining.isEmpty()) {
@@ -425,9 +419,7 @@ class Fetcher<K,V> extends Thread {
    * only on the last failure. Instead of connecting with a timeout of 
    * X, we try connecting with a timeout of x < X but multiple times. 
    */
-  private InputStream getInputStream(URLConnection connection, 
-                                     int connectionTimeout, 
-                                     int readTimeout) 
+  private void connect(URLConnection connection, int connectionTimeout)
   throws IOException {
     int unit = 0;
     if (connectionTimeout < 0) {
@@ -436,13 +428,12 @@ class Fetcher<K,V> extends Thread {
     } else if (connectionTimeout > 0) {
       unit = Math.min(UNIT_CONNECT_TIMEOUT, connectionTimeout);
     }
-    // set the read timeout to the total timeout
-    connection.setReadTimeout(readTimeout);
     // set the connect timeout to the unit-connect-timeout
     connection.setConnectTimeout(unit);
     while (true) {
       try {
-        return connection.getInputStream();
+        connection.connect();
+        break;
       } catch (IOException ioe) {
         // update the total remaining connect-timeout
         connectionTimeout -= unit;