You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/09/29 12:21:18 UTC

svn commit: r700044 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/core/org/apache/hadoop/net/ConnTimeoutException.java src/core/org/apache/hadoop/net/ReadTimeoutException.java src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Author: ddas
Date: Mon Sep 29 03:21:18 2008
New Revision: 700044

URL: http://svn.apache.org/viewvc?rev=700044&view=rev
Log:
Merge -r 700042:700043 from trunk onto 0.19 branch. Reverts HADOOP-3327.

Removed:
    hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/net/ConnTimeoutException.java
    hadoop/core/branches/branch-0.19/src/core/org/apache/hadoop/net/ReadTimeoutException.java
Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=700044&r1=700043&r2=700044&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Mon Sep 29 03:21:18 2008
@@ -268,10 +268,6 @@
     HADOOP-3756. Minor. Remove unused dfs.client.buffer.dir from 
     hadoop-default.xml. (rangadi)
 
-    HADOOP-3327. Treats connection and read timeouts differently in the 
-    shuffle and the backoff logic is dependent on the type of timeout.
-    (Jothi Padmanabhan via ddas)
-
     HADOOP-3747. Adds counter suport for MultipleOutputs. 
     (Alejandro Abdelnur via ddas)
 

Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=700044&r1=700043&r2=700044&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Mon Sep 29 03:21:18 2008
@@ -74,8 +74,6 @@
 import org.apache.hadoop.metrics.MetricsRecord;
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
-import org.apache.hadoop.net.ConnTimeoutException;
-import org.apache.hadoop.net.ReadTimeoutException;
 import org.apache.hadoop.util.Progress;
 import org.apache.hadoop.util.Progressable;
 import org.apache.hadoop.util.ReflectionUtils;
@@ -96,12 +94,6 @@
   private int numMaps;
   private ReduceCopier reduceCopier;
 
-  private static enum CopyOutputErrorType {
-	  NO_ERROR,
-	  CONNECTION_ERROR,
-	  READ_ERROR
-  }
-
   private CompressionCodec codec;
 
 
@@ -110,7 +102,6 @@
     setPhase(TaskStatus.Phase.SHUFFLE);        // phase to start with 
   }
 
-
   private Progress copyPhase;
   private Progress sortPhase;
   private Progress reducePhase;
@@ -637,15 +628,9 @@
     Set<TaskID> fetchFailedMaps = new TreeSet<TaskID>(); 
     
     /**
-     * A map of taskId -> no. of failed fetches in connect
-     */
-    Map<TaskAttemptID, Integer> mapTaskToConnectFailedFetchesMap = 
-      new HashMap<TaskAttemptID, Integer>();    
-
-    /**
-     * A map of taskId -> no. of failed fetches in read
+     * A map of taskId -> no. of failed fetches
      */
-    Map<TaskAttemptID, Integer> mapTaskToReadFailedFetchesMap = 
+    Map<TaskAttemptID, Integer> mapTaskToFailedFetchesMap = 
       new HashMap<TaskAttemptID, Integer>();    
 
     /**
@@ -665,7 +650,6 @@
       Collections.synchronizedList(new LinkedList<MapOutput>());
     
 
-
     /**
      * This class contains the methods that should be used for metrics-reporting
      * the specific metrics for shuffle. This class actually reports the
@@ -728,8 +712,7 @@
     /** Represents the result of an attempt to copy a map output */
     private class CopyResult {
       
-
-    	// the map output location against which a copy attempt was made
+      // the map output location against which a copy attempt was made
       private final MapOutputLocation loc;
       
       // the size of the file copied, -1 if the transfer failed
@@ -737,14 +720,10 @@
       
       //a flag signifying whether a copy result is obsolete
       private static final int OBSOLETE = -2;
-
-      CopyOutputErrorType errorType;
-
-      CopyResult(MapOutputLocation loc, long size, 
-    		     CopyOutputErrorType errorType) {
+      
+      CopyResult(MapOutputLocation loc, long size) {
         this.loc = loc;
         this.size = size;
-        this.errorType = errorType;
       }
       
       public boolean getSuccess() { return size >= 0; }
@@ -753,9 +732,6 @@
       }
       public long getSize() { return size; }
       public String getHost() { return loc.getHost(); }
-      public CopyOutputErrorType getErrorType() {
-    	  return ((size < 0) ? errorType: CopyOutputErrorType.NO_ERROR); 
-      }
       public MapOutputLocation getLocation() { return loc; }
     }
     
@@ -1026,6 +1002,19 @@
       }
       
       /**
+       * Fail the current file that we are fetching
+       * @return were we currently fetching?
+       */
+      public synchronized boolean fail() {
+        if (currentLocation != null) {
+          finish(-1);
+          return true;
+        } else {
+          return false;
+        }
+      }
+      
+      /**
        * Get the current map output location.
        */
       public synchronized MapOutputLocation getLocation() {
@@ -1036,12 +1025,11 @@
         currentLocation = loc;
       }
       
-      private synchronized void finish(long size, 
-    		                           CopyOutputErrorType errorType) {
+      private synchronized void finish(long size) {
         if (currentLocation != null) {
           LOG.debug(getName() + " finishing " + currentLocation + " =" + size);
           synchronized (copyResults) {
-            copyResults.add(new CopyResult(currentLocation, size, errorType));
+            copyResults.add(new CopyResult(currentLocation, size));
             copyResults.notify();
           }
           currentLocation = null;
@@ -1057,7 +1045,6 @@
           try {
             MapOutputLocation loc = null;
             long size = -1;
-            CopyOutputErrorType errorType = CopyOutputErrorType.NO_ERROR;
             
             synchronized (scheduledCopies) {
               while (scheduledCopies.isEmpty()) {
@@ -1079,18 +1066,9 @@
               
               // Reset 
               size = -1;
-
-              // Identify the error type 
-              if (e.getClass() == ConnTimeoutException.class) {
-                errorType = CopyOutputErrorType.CONNECTION_ERROR;
-              }
-              else if (e.getClass() == ReadTimeoutException.class) {
-                errorType = CopyOutputErrorType.READ_ERROR;
-              }
-
             } finally {
               shuffleClientMetrics.threadFree();
-              finish(size, errorType);
+              finish(size);
             }
           } catch (InterruptedException e) { 
             return; // ALL DONE
@@ -1276,17 +1254,26 @@
         connection.setReadTimeout(readTimeout);
         // set the connect timeout to the unit-connect-timeout
         connection.setConnectTimeout(unit);
+        while (true) {
+          try {
+            return connection.getInputStream();
+          } catch (IOException ioe) {
+            // update the total remaining connect-timeout
+            connectionTimeout -= unit;
 
-        try {
-          connection.connect();
-        } catch (IOException ioe) {
-          throw new ConnTimeoutException("Connection Timed out");
-        }
+            // throw an exception if we have waited for timeout amount of time
+            // note that the updated value if timeout is used here
+            if (connectionTimeout == 0) {
+              throw ioe;
+            }
 
-        try {
-          return connection.getInputStream();
-        } catch (IOException ioe) {
-          throw new ReadTimeoutException("Read Timed out");
+            // reset the connect timeout for the last try
+            if (connectionTimeout < unit) {
+              unit = connectionTimeout;
+              // reset the connect time out for the final connect
+              connection.setConnectTimeout(unit);
+            }
+          }
         }
       }
 
@@ -1857,61 +1844,33 @@
                        cr.getHost());
             } else {
               retryFetches.add(cr.getLocation());
-
-              CopyOutputErrorType errorType = cr.getErrorType();
               
               // note the failed-fetch
               TaskAttemptID mapTaskId = cr.getLocation().getTaskAttemptId();
               TaskID mapId = cr.getLocation().getTaskId();
               
               totalFailures++;
-
-              Integer noFailedFetches = 0;
-
-              Integer noReadFailedFetches = 
-                mapTaskToReadFailedFetchesMap.get(mapTaskId);
-
-              if (noReadFailedFetches == null) noReadFailedFetches = 0;
-
-              Integer noConnectFailedFetches = 
-                mapTaskToConnectFailedFetchesMap.get(mapTaskId);
-
-              if (noConnectFailedFetches == null) noConnectFailedFetches = 0;
-
-              if (errorType == CopyOutputErrorType.READ_ERROR) {
-                noReadFailedFetches ++;
-                mapTaskToReadFailedFetchesMap.put (mapTaskId, 
-                                                   noReadFailedFetches);
+              Integer noFailedFetches = 
+                mapTaskToFailedFetchesMap.get(mapTaskId);
+              noFailedFetches = 
+                (noFailedFetches == null) ? 1 : (noFailedFetches + 1);
+              mapTaskToFailedFetchesMap.put(mapTaskId, noFailedFetches);
+              LOG.info("Task " + getTaskID() + ": Failed fetch #" + 
+                       noFailedFetches + " from " + mapTaskId);
+              
+              // did the fetch fail too many times?
+              // using a hybrid technique for notifying the jobtracker.
+              //   a. the first notification is sent after max-retries 
+              //   b. subsequent notifications are sent after 2 retries.   
+              if ((noFailedFetches >= maxFetchRetriesPerMap) 
+                  && ((noFailedFetches - maxFetchRetriesPerMap) % 2) == 0) {
                 synchronized (ReduceTask.this) {
                   taskStatus.addFetchFailedMap(mapTaskId);
                   LOG.info("Failed to fetch map-output from " + mapTaskId + 
-                           " Got a Read Time out," + 
-                           " reporting to the JobTracker");
-                }
-              } else if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
-                noConnectFailedFetches ++;
-                mapTaskToConnectFailedFetchesMap.put (
-                  mapTaskId, noConnectFailedFetches);
-
-                LOG.info("Task " + getTaskID() + ": Failed fetch #"  
-                  + noConnectFailedFetches + " from " + mapTaskId); 
-
-                if ((noConnectFailedFetches >= maxFetchRetriesPerMap) &&              
-                   ((noConnectFailedFetches - maxFetchRetriesPerMap) % 2)      
-                    == 0) {               
-                  synchronized (ReduceTask.this) {       
-                    taskStatus.addFetchFailedMap(mapTaskId);
-                    LOG.info("Failed to fetch map-output from " + mapTaskId     
-                             + " even after MAX_FETCH_RETRIES_PER_MAP"
-                             + " (connect) retries... "     
-                             + " reporting to the JobTracker");            
-                  }
+                           " even after MAX_FETCH_RETRIES_PER_MAP retries... "
+                           + " reporting to the JobTracker");
                 }
               }
-
-              noFailedFetches = noConnectFailedFetches + 
-                                noReadFailedFetches;
-
               // note unique failed-fetch maps
               if (noFailedFetches == maxFetchRetriesPerMap) {
                 fetchFailedMaps.add(mapId);
@@ -1960,32 +1919,22 @@
                 }
               }
                 
-              if (errorType == CopyOutputErrorType.CONNECTION_ERROR) {
-                // back off exponentially until num_retries <= max_retries
-                // back off by max_backoff/2 on subsequent failed attempts
-                currentTime = System.currentTimeMillis();
-                int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
+              // back off exponentially until num_retries <= max_retries
+              // back off by max_backoff/2 on subsequent failed attempts
+              currentTime = System.currentTimeMillis();
+              int currentBackOff = noFailedFetches <= maxFetchRetriesPerMap 
                                    ? BACKOFF_INIT 
                                      * (1 << (noFailedFetches - 1)) 
                                    : (this.maxBackoff * 1000 / 2);
-                penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
-                LOG.warn(reduceTask.getTaskID() + " adding host " +
+              penaltyBox.put(cr.getHost(), currentTime + currentBackOff);
+              LOG.warn(reduceTask.getTaskID() + " adding host " +
                        cr.getHost() + " to penalty box, next contact in " +
                        (currentBackOff/1000) + " seconds");
-              } else if (errorType == CopyOutputErrorType.READ_ERROR) {
-                int backOff = Math.max(maxMapRuntime/2,
-                                    (this.maxBackoff * 1000));    
-                penaltyBox.put(cr.getHost(), currentTime + backOff);
-                LOG.warn(reduceTask.getTaskID() + " adding host " +
-                       cr.getHost() + " to penalty box, next contact in " + 
-                       (backOff/1000) + " seconds");  
-              }  
-
-            } // Fetch Failure
+            }
             uniqueHosts.remove(cr.getHost());
             numInFlight--;
-          } // while (numInFlight > 0)
-        } // while (copiedMaps < numMaps)
+          }
+        }
         
         // all done, inform the copiers to exit
         synchronized (copiers) {