You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 02:08:16 UTC

svn commit: r1076912 - /hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Author: omalley
Date: Fri Mar  4 01:08:16 2011
New Revision: 1076912

URL: http://svn.apache.org/viewvc?rev=1076912&view=rev
Log:
commit dc1e47c079f73b87f87e123be15f5988e27ad16b
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:13 2009 -0700

    Applying patch 2358082.3327.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=1076912&r1=1076911&r2=1076912&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Fri Mar  4 01:08:16 2011
@@ -564,6 +564,12 @@ class ReduceTask extends Task {
     output.close(reducerContext);
   }
 
+  private static enum CopyOutputErrorType {
+    NO_ERROR,
+    READ_ERROR,
+    OTHER_ERROR
+  };
+
   class ReduceCopier<K, V> implements MRConstants {
 
     /** Reference to the umbilical object */
@@ -745,6 +751,11 @@ class ReduceTask extends Task {
     private static final int MIN_FETCH_RETRIES_PER_MAP = 2;
 
     /**
+     * The minimum percentage of maps yet to be copied, 
+     * which indicates end of shuffle
+     */
+    private static final float MIN_PENDING_MAPS_PERCENT = 0.25f;
+    /**
      * Maximum no. of unique maps from which we failed to fetch map-outputs
      * even after {@link #maxFetchRetriesPerMap} retries; after this the
      * reduce task is failed.
@@ -857,11 +868,18 @@ class ReduceTask extends Task {
       //a flag signifying whether a copy result is obsolete
       private static final int OBSOLETE = -2;
       
+      private CopyOutputErrorType error = CopyOutputErrorType.NO_ERROR;
       CopyResult(MapOutputLocation loc, long size) {
         this.loc = loc;
         this.size = size;
       }
-      
+
+      CopyResult(MapOutputLocation loc, long size, CopyOutputErrorType error) {
+        this.loc = loc;
+        this.size = size;
+        this.error = error;
+      }
+
       public boolean getSuccess() { return size >= 0; }
       public boolean isObsolete() { 
         return size == OBSOLETE;
@@ -869,6 +887,7 @@ class ReduceTask extends Task {
       public long getSize() { return size; }
       public String getHost() { return loc.getHost(); }
       public MapOutputLocation getLocation() { return loc; }
+      public CopyOutputErrorType getError() { return error; }
     }
     
     private int nextMapOutputCopierId = 0;
@@ -1119,6 +1138,7 @@ class ReduceTask extends Task {
       private MapOutputLocation currentLocation = null;
       private int id = nextMapOutputCopierId++;
       private Reporter reporter;
+      private boolean readError = false;
       
       // Decompression of map-outputs
       private CompressionCodec codec = null;
@@ -1143,7 +1163,7 @@ class ReduceTask extends Task {
        */
       public synchronized boolean fail() {
         if (currentLocation != null) {
-          finish(-1);
+          finish(-1, CopyOutputErrorType.OTHER_ERROR);
           return true;
         } else {
           return false;
@@ -1161,11 +1181,11 @@ class ReduceTask extends Task {
         currentLocation = loc;
       }
       
-      private synchronized void finish(long size) {
+      private synchronized void finish(long size, CopyOutputErrorType error) {
         if (currentLocation != null) {
           LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
           synchronized (copyResults) {
-            copyResults.add(new CopyResult(currentLocation, size));
+            copyResults.add(new CopyResult(currentLocation, size, error));
             copyResults.notify();
           }
           currentLocation = null;
@@ -1188,23 +1208,27 @@ class ReduceTask extends Task {
               }
               loc = scheduledCopies.remove(0);
             }
-            
+            CopyOutputErrorType error = CopyOutputErrorType.OTHER_ERROR;
+            readError = false;
             try {
               shuffleClientMetrics.threadBusy();
               start(loc);
               size = copyOutput(loc);
               shuffleClientMetrics.successFetch();
+              error = CopyOutputErrorType.NO_ERROR;
             } catch (IOException e) {
               LOG.warn(reduceTask.getTaskID() + " copy failed: " +
                        loc.getTaskAttemptId() + " from " + loc.getHost());
               LOG.warn(StringUtils.stringifyException(e));
               shuffleClientMetrics.failedFetch();
-              
+              if (readError) {
+                error = CopyOutputErrorType.READ_ERROR;
+              }
               // Reset 
               size = -1;
             } finally {
               shuffleClientMetrics.threadFree();
-              finish(size);
+              finish(size, error);
             }
           } catch (InterruptedException e) { 
             break; // ALL DONE
@@ -1444,7 +1468,8 @@ class ReduceTask extends Task {
         connection.setConnectTimeout(unit);
         while (true) {
           try {
-            return connection.getInputStream();
+            connection.connect();
+            break;
           } catch (IOException ioe) {
             // update the total remaining connect-timeout
             connectionTimeout -= unit;
@@ -1463,6 +1488,12 @@ class ReduceTask extends Task {
             }
           }
         }
+        try {
+          return connection.getInputStream();
+        } catch (IOException ioe) {
+          readError = true;
+          throw ioe;
+        }
       }
 
       private MapOutput shuffleInMemory(MapOutputLocation mapOutputLoc,
@@ -1548,6 +1579,7 @@ class ReduceTask extends Task {
           IOUtils.cleanup(LOG, input);
 
           // Re-throw
+          readError = true;
           throw ioe;
         }
 
@@ -1613,7 +1645,13 @@ class ReduceTask extends Task {
           output = rfs.create(localFilename);
           
           byte[] buf = new byte[64 * 1024];
-          int n = input.read(buf, 0, buf.length);
+          int n = -1;
+          try {
+            n = input.read(buf, 0, buf.length);
+          } catch (IOException ioe) {
+            readError = true;
+            throw ioe;
+          }
           while (n > 0) {
             bytesRead += n;
             shuffleClientMetrics.inputBytes(n);
@@ -1621,7 +1659,12 @@ class ReduceTask extends Task {
 
             // indicate we're making progress
             reporter.progress();
-            n = input.read(buf, 0, buf.length);
+            try {
+              n = input.read(buf, 0, buf.length);
+            } catch (IOException ioe) {
+              readError = true;
+              throw ioe;
+            }
           }
 
           LOG.info("Read " + bytesRead + " bytes from map-output for " +
@@ -2037,17 +2080,38 @@ class ReduceTask extends Task {
               mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
               LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
                        noFailedFetches + " from " + mapTaskId);
+
+              // half the number of max fetch retries per map during 
+              // the end of shuffle
+              int fetchRetriesPerMap = maxFetchRetriesPerMap;
+              int pendingCopies = numMaps - numCopied;
+              
+              // The check noFailedFetches != maxFetchRetriesPerMap is
+              // required to make sure of the notification in case of a
+              // corner case : 
+              // when noFailedFetches reached maxFetchRetriesPerMap and 
+              // reducer reached the end of shuffle, then we may miss sending
+              // a notification if the difference between 
+              // noFailedFetches and fetchRetriesPerMap is not divisible by 2 
+              if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT &&
+                  noFailedFetches != maxFetchRetriesPerMap) {
+                fetchRetriesPerMap = fetchRetriesPerMap >> 1;
+              }
               
               // did the fetch fail too many times?
               // using a hybrid technique for notifying the jobtracker.
               //   a. the first notification is sent after max-retries 
               //   b. subsequent notifications are sent after 2 retries.   
-              if ((noFailedFetches >= maxFetchRetriesPerMap) 
-                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
+              //   c. send notification immediately if it is a read error.   
+              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR) ||
+                 ((noFailedFetches >= fetchRetriesPerMap) 
+                  && ((noFailedFetches - fetchRetriesPerMap) % 2) == 0)) {
                 synchronized (ReduceTask.this) {
                   taskStatus.addFetchFailedMap(mapTaskId);
+                  reporter.progress();
                   LOG.info("Failed to fetch map-output from " + mapTaskId + 
                            " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                           + " or it is a read error, "
                            + " reporting to the JobTracker");
                 }
               }
@@ -2103,10 +2167,22 @@ class ReduceTask extends Task {
               // back off exponentially until num_retries <= max_retries
               // back off by max_backoff/2 on subsequent failed attempts
               currentTime = System.currentTimeMillis();
-              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
+              int currentBackOff = noFailedFetches <= fetchRetriesPerMap 
                                    ? BACKOFF_INIT 
                                      * (1 << (noFailedFetches - 1)) 
                                    : (this.maxBackoff * 1000 / 2);
+              // If it is read error,
+              //    back off for maxMapRuntime/2
+              //    during end of shuffle, 
+              //      backoff for min(maxMapRuntime/2, currentBackOff) 
+              if (cr.getError().equals(CopyOutputErrorType.READ_ERROR)) {
+                int backOff = maxMapRuntime >> 1;
+                if (pendingCopies <= numMaps * MIN_PENDING_MAPS_PERCENT) {
+                  backOff = Math.min(backOff, currentBackOff); 
+                } 
+                currentBackOff = backOff;
+              }
+
               penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
               LOG.warn(reduceTask.getTaskID() + " adding host " +
                        cr.getHost() + " to penalty box, next contact in " +