You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/12/21 19:51:22 UTC

svn commit: r606267 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/ReduceTask.java src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

Author: acmurthy
Date: Fri Dec 21 10:51:22 2007
New Revision: 606267

URL: http://svn.apache.org/viewvc?rev=606267&view=rev
Log:
HADOOP-2247.  Fine-tune the strategies for killing mappers and reducers due to failures while fetching map-outputs. Now the map-completion times and number of currently running reduces are taken into account by the  JobTracker before  killing the mappers, while the progress made by the reducer and the number of fetch-failures vis-a-vis total number of fetch-attempts are taken into account before teh reducer kills itself. Contributed by Amar Kamat.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=606267&r1=606266&r2=606267&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Fri Dec 21 10:51:22 2007
@@ -318,6 +318,14 @@
     round-robin disk selections randomly. This helps in spreading data across
     multiple partitions much better. (acmurhty)
 
+    HADOOP-2247.  Fine-tune the strategies for killing mappers and reducers
+    due to failures while fetching map-outputs. Now the map-completion times
+    and number of currently running reduces are taken into account by the
+    JobTracker before  killing the mappers, while the progress made by the
+    reducer and the number of fetch-failures vis-a-vis total number of
+    fetch-attempts are taken into account before teh reducer kills itself.
+    (Amar Kamat via acmurthy)
+
   IMPROVEMENTS
 
     HADOOP-2160.  Remove project-level, non-user documentation from

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=606267&r1=606266&r2=606267&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Fri Dec 21 10:51:22 2007
@@ -79,6 +79,9 @@
   // The maximum percentage of trackers in cluster added to the 'blacklist'.
   private static final double CLUSTER_BLACKLIST_PERCENT = 0.25;
   
+  // The maximum percentage of fetch failures allowed for a map 
+  private static final double MAX_ALLOWED_FETCH_FAILURES_PERCENT = 0.5;
+  
   // No. of tasktrackers in the cluster
   private volatile int clusterSize = 0;
   
@@ -407,6 +410,8 @@
                                             TaskCompletionEvent.Status.SUCCEEDED,
                                             httpTaskLogLocation 
                                            );
+        taskEvent.setTaskRunTime((int)(status.getFinishTime() 
+                                       - status.getStartTime()));
         tip.setSuccessEventNumber(taskCompletionEventTracker); 
       }
       //For a failed task update the JT datastructures.For the task state where
@@ -1174,7 +1179,13 @@
     LOG.info("Failed fetch notification #" + fetchFailures + " for task " + 
             mapTaskId);
     
-    if (fetchFailures == MAX_FETCH_FAILURES_NOTIFICATIONS) {
+    float failureRate = (float)fetchFailures / runningReduceTasks;
+    // declare faulty if fetch-failures >= max-allowed-failures
+    boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
+                          ? true
+                          : false;
+    if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
+        && isMapFaulty) {
       LOG.info("Too many fetch-failures for output of task: " + mapTaskId 
                + " ... killing it");
       

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?rev=606267&r1=606266&r2=606267&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Fri Dec 21 10:51:22 2007
@@ -481,11 +481,31 @@
     private long ramfsMergeOutputSize;
     
     /**
+     * the max of all the map completion times
+     */
+    private int maxMapRuntime;
+    
+    /**
      * Maximum number of fetch-retries per-map.
      */
     private int maxFetchRetriesPerMap;
     
     /**
+     * Maximum percent of failed fetch attempt before killing the reduce task.
+     */
+    private static final float MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT = 0.5f;
+
+    /**
+     * Minimum percent of progress required to keep the reduce alive.
+     */
+    private static final float MIN_REQUIRED_PROGRESS_PERCENT = 0.5f;
+
+    /**
+     * Maximum percent of shuffle execution time required to keep the reducer alive.
+     */
+    private static final float MAX_ALLOWED_STALL_TIME_PERCENT = 0.5f;
+
+    /**
      * Maximum no. of unique maps from which we failed to fetch map-outputs
      * even after {@link #maxFetchRetriesPerMap} retries; after this the
      * reduce task is failed.
@@ -885,12 +905,14 @@
                                        (this.reduceTask.getPartition()%10)
                                       );
       this.random = new Random(randomSeed);
+      this.maxMapRuntime = 0;
     }
     
     public boolean fetchOutputs() throws IOException {
       final int      numOutputs = reduceTask.getNumMaps();
       List<MapOutputLocation> knownOutputs = 
         new ArrayList<MapOutputLocation>(numCopiers);
+      int totalFailures = 0;
       int            numInFlight = 0, numCopied = 0;
       int            lowThreshold = numCopiers*2;
       long           bytesTransferred = 0;
@@ -918,6 +940,7 @@
       // start the clock for bandwidth measurement
       long startTime = System.currentTimeMillis();
       long currentTime = startTime;
+      long lastProgressTime = System.currentTimeMillis();
       IntWritable fromEventId = new IntWritable(0);
       
       try {
@@ -1027,6 +1050,7 @@
             if (cr != null) {
               if (cr.getSuccess()) {  // a successful copy
                 numCopied++;
+                lastProgressTime = System.currentTimeMillis();
                 bytesTransferred += cr.getSize();
                 
                 long secsSinceStart = 
@@ -1055,6 +1079,7 @@
                 String mapTaskId = cr.getLocation().getMapTaskId();
                 Integer mapId = cr.getLocation().getMapId();
                 
+                totalFailures++;
                 Integer noFailedFetches = 
                   mapTaskToFailedFetchesMap.get(mapTaskId);
                 noFailedFetches = 
@@ -1082,8 +1107,43 @@
                   fetchFailedMaps.add(mapId);
                   
                   // did we have too many unique failed-fetch maps?
-                  if (fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES) {
-                    LOG.fatal("Shuffle failed with too many fetch failures! " +
+                  // and did we fail on too many fetch attempts?
+                  // and did we progress enough
+                  //     or did we wait for too long without any progress?
+                  
+                  // check if the reducer is healthy
+                  boolean reducerHealthy = 
+                      (((float)totalFailures / (totalFailures + numCopied)) 
+                       < MAX_ALLOWED_FAILED_FETCH_ATTEMPT_PERCENT);
+                  
+                  // check if the reducer has progressed enough
+                  boolean reducerProgressedEnough = 
+                      (((float)numCopied / numMaps) 
+                       >= MIN_REQUIRED_PROGRESS_PERCENT);
+                  
+                  // check if the reducer is stalled for a long time
+                  
+                  // duration for which the reducer is stalled
+                  int stallDuration = 
+                      (int)(System.currentTimeMillis() - lastProgressTime);
+                  // duration for which the reducer ran with progress
+                  int shuffleProgressDuration = 
+                      (int)(lastProgressTime - startTime);
+                  // min time the reducer should run without getting killed
+                  int minShuffleRunDuration = 
+                      (shuffleProgressDuration > maxMapRuntime) 
+                      ? shuffleProgressDuration 
+                      : maxMapRuntime;
+                  boolean reducerStalled = 
+                      (((float)stallDuration / minShuffleRunDuration) 
+                       >= MAX_ALLOWED_STALL_TIME_PERCENT);
+                  
+                  // kill if not healthy and has insufficient progress
+                  if ((fetchFailedMaps.size() >= MAX_FAILED_UNIQUE_FETCHES)
+                      && !reducerHealthy 
+                      && (!reducerProgressedEnough || reducerStalled)) { 
+                    LOG.fatal("Shuffle failed with too many fetch failures " + 
+                              "and insufficient progress!" +
                               "Killing task " + getTaskId() + ".");
                     umbilical.shuffleError(getTaskId(), 
                                            "Exceeded MAX_FAILED_UNIQUE_FETCHES;"
@@ -1280,6 +1340,13 @@
             int port = u.getPort();
             String taskId = event.getTaskId();
             int mId = event.idWithinJob();
+            int duration = event.getTaskRunTime();
+            if (duration > maxMapRuntime) {
+              maxMapRuntime = duration; 
+              // adjust max-fetch-retries based on max-map-run-time
+              maxFetchRetriesPerMap = 
+                  getClosestPowerOf2((maxMapRuntime / BACKOFF_INIT) + 1);
+            }
             knownOutputs.add(new MapOutputLocation(taskId, mId, host, port));
           }
           break;

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?rev=606267&r1=606266&r2=606267&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Fri Dec 21 10:51:22 2007
@@ -33,6 +33,7 @@
     
   private int eventId; 
   private String taskTrackerHttp;
+  private int taskRunTime; // using int since runtime is the time difference
   private String taskId;
   Status status; 
   boolean isMap = false;
@@ -95,6 +96,22 @@
   public String getTaskTrackerHttp() {
     return taskTrackerHttp;
   }
+
+  /**
+   * Returns time (in millisec) the task took to complete. 
+   */
+  public int getTaskRunTime() {
+    return taskRunTime;
+  }
+
+  /**
+   * Set the task completion time
+   * @param taskCompletionTime time (in millisec) the task took to complete
+   */
+  public void setTaskRunTime(int taskCompletionTime) {
+    this.taskRunTime = taskCompletionTime;
+  }
+
   /**
    * set event Id. should be assigned incrementally starting from 0. 
    * @param eventId
@@ -153,6 +170,7 @@
     out.writeBoolean(isMap);
     WritableUtils.writeEnum(out, status); 
     WritableUtils.writeString(out, taskTrackerHttp);
+    WritableUtils.writeVInt(out, taskRunTime);
   }
   
   public void readFields(DataInput in) throws IOException {
@@ -161,5 +179,6 @@
     this.isMap = in.readBoolean();
     this.status = WritableUtils.readEnum(in, Status.class);
     this.taskTrackerHttp = WritableUtils.readString(in);
+    this.taskRunTime = WritableUtils.readVInt(in);
   }
 }