You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/12/21 19:57:56 UTC

svn commit: r606268 - in /lucene/hadoop/branches/branch-0.15: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

Author: acmurthy
Date: Fri Dec 21 10:57:55 2007
New Revision: 606268

URL: http://svn.apache.org/viewvc?rev=606268&view=rev
Log:
Merge -r 606266:606267 from trunk to branch-0.15 to fix HADOOP-2247.

Modified:
    lucene/hadoop/branches/branch-0.15/CHANGES.txt
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

Modified: lucene/hadoop/branches/branch-0.15/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/CHANGES.txt?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/CHANGES.txt (original)
+++ lucene/hadoop/branches/branch-0.15/CHANGES.txt Fri Dec 21 10:57:55 2007
@@ -39,6 +39,14 @@
     round-robin disk selections randomly. This helps in spreading data across
     multiple partitions much better. (acmurhty)
 
+    HADOOP-2247.  Fine-tune the strategies for killing mappers and reducers
+    due to failures while fetching map-outputs. Now the map-completion times
+    and number of currently running reduces are taken into account by the
+    JobTracker before  killing the mappers, while the progress made by the
+    reducer and the number of fetch-failures vis-a-vis total number of
+    fetch-attempts are taken into account before teh reducer kills itself.
+    (Amar Kamat via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-2160.  Remove project-level, non-user documentation from

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 10:57:55 2007
@@ -79,6 +79,9 @@
   // The maximum percentage of trackers in cluster added to the 'blacklist'.
   private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
   
+  // The maximum percentage of fetch failures allowed for a map 
+  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+  
   // No. of tasktrackers in the cluster
   private volatile int clusterSize = 0;
   
@@ -405,6 +408,8 @@
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
+        taskEvent.setTaskRunTime((int)(status.getFinishTime() 
+                                       - status.getStartTime()));
         tip.setSuccessEventNumber(taskCompletionEventTracker); 
       }
       //For a failed task update the JT datastructures.For the task state where
@@ -1164,7 +1169,13 @@
     LOG.info("Failed fetch notification #" + fetchFailures + " for task " + 
             mapTaskId);
     
-    if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) {
+    float failureRate = (float)fetchFailures / runningReduceTasks;
+    // declare faulty if fetch-failures >= max-allowed-failures
+    boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
+                          ? true
+                          : false;
+    if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
+        && isMapFaulty) {
       LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
                + " ... killing it");
       

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 21 10:57:55 2007
@@ -476,11 +476,31 @@
     private long ramfsMergeOutputSize;
     
     /**
-     * Maximum no. of fetch-retries per-map.
+     * the max of all the map completion times
+     */
+    private int maxMapRuntime;
+    
+    /**
+     * Maximum number of fetch-retries per-map.
      */
     private static final int MAX_FETCH_RETRIES_PER_MAP = 5;
     
     /**
+     * Maximum percent of failed fetch attempt before killing the reduce task.
+     */
+    private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+
+    /**
+     * Minimum percent of progress required to keep the reduce alive.
+     */
+    private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+
+    /**
+     * Maximum percent of shuffle execution time required to keep the reducer alive.
+     */
+    private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+    /**
      * Maximum no. of unique maps from which we failed to fetch map-outputs
      * even after {@link #MAX_FETCH_RETRIES_PER_MAP} retries; after this the
      * reduce task is failed.
@@ -862,12 +882,14 @@
                                        (this.reduceTask.getPartition()%10)
                                       );
       this.random = new Random(randomSeed);
+      this.maxMapRuntime = 0;
     }
     
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
       List<MapOutputLocation> knownOutputs = 
         new ArrayList<MapOutputLocation>(numCopiers);
+      int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
@@ -896,6 +918,7 @@
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
+      long lastProgressTime = System.currentTimeMillis();
       IntWritable fromEventId = new IntWritable(0);
       
       try {
@@ -1005,6 +1028,7 @@
             if (cr != null) {
               if (cr.getSuccess()) {  // a successful copy
                 numCopied++;
+                lastProgressTime = System.currentTimeMillis();
                 bytesTransferred += cr.getSize();
                 
                 long secsSinceStart = 
@@ -1033,6 +1057,7 @@
                 String mapTaskId = cr.getLocation().getMapTaskId();
                 Integer mapId = cr.getLocation().getMapId();
                 
+                totalFailures++;
                 Integer noFailedFetches = 
                   mapTaskToFailedFetchesMap.get(mapTaskId);
                 noFailedFetches = 
@@ -1056,8 +1081,43 @@
                   fetchFailedMaps.add(mapId);
                   
                   // did we have too many unique failed-fetch maps?
-                  if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
-                    LOG.fatal("Shuffle failed with too many fetch failures! " +
+                  // and did we fail on too many fetch attempts?
+                  // and did we progress enough
+                  //     or did we wait for too long without any progress?
+                  
+                  // check if the reducer is healthy
+                  boolean reducerHealthy = 
+                      (((float)totalFailures / (totalFailures + numCopied)) 
+                       < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+                  
+                  // check if the reducer has progressed enough
+                  boolean reducerProgressedEnough = 
+                      (((float)numCopied / numMaps) 
+                       >= MIN_REQUIRED_PROGRESS_PERCENT);
+                  
+                  // check if the reducer is stalled for a long time
+                  
+                  // duration for which the reducer is stalled
+                  int stallDuration = 
+                      (int)(System.currentTimeMillis() - lastProgressTime);
+                  // duration for which the reducer ran with progress
+                  int shuffleProgressDuration = 
+                      (int)(lastProgressTime - startTime);
+                  // min time the reducer should run without getting killed
+                  int minShuffleRunDuration = 
+                      (shuffleProgressDuration > maxMapRuntime) 
+                      ? shuffleProgressDuration 
+                      : maxMapRuntime;
+                  boolean reducerStalled = 
+                      (((float)stallDuration / minShuffleRunDuration) 
+                       >= MAX_ALLOWED_STALL_TIME_PERCENT);
+                  
+                  // kill if not healthy and has insufficient progress
+                  if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+                      && !reducerHealthy 
+                      && (!reducerProgressedEnough || reducerStalled)) { 
+                    LOG.fatal("Shuffle failed with too many fetch failures " + 
+                              "and insufficient progress!" +
                               "Killing task " + getTaskId() + ".");
                     umbilical.shuffleError(getTaskId(), 
                                            "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
@@ -1251,6 +1311,13 @@
             int port = u.getPort();
             String taskId = event.getTaskId();
             int mId = event.idWithinJob();
+            int duration = event.getTaskRunTime();
+            if (duration > maxMapRuntime) {
+              maxMapRuntime = duration; 
+              // adjust max-fetch-retries based on max-map-run-time
+              maxFetchRetriesPerMap = 
+                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
+            }
             knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
           }
           break;

Modified: lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=606268&r1=606267&r2=606268&view=diff
==============================================================================
--- lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/branches/branch-0.15/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Fri Dec 21 10:57:55 2007
@@ -33,6 +33,7 @@
     
   private int eventId; 
   private String taskTrackerHttp;
+  private int taskRunTime; // using int since runtime is the time difference
   private String taskId;
   Status status; 
   boolean isMap = false;
@@ -95,6 +96,22 @@
   public String getTaskTrackerHttp() {
     return taskTrackerHttp;
   }
+
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  public void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
   /**
    * set event Id. should be assigned incrementally starting from 0. 
    * @param eventId
@@ -153,6 +170,7 @@
     out.writeBoolean(isMap);
     WritableUtils.writeEnum(out, status); 
     WritableUtils.writeString(out, taskTrackerHttp);
+    WritableUtils.writeVInt(out, taskRunTime);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -161,5 +179,6 @@
     this.isMap = in.readBoolean();
     this.status = WritableUtils.readEnum(in, Status.class);
     this.taskTrackerHttp = WritableUtils.readString(in);
+    this.taskRunTime = WritableUtils.readVInt(in);
   }
 }