You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/05/03 21:51:15 UTC

svn commit: r534975 - in /lucene/hadoop/trunk: ./ src/java/org/apache/hadoop/mapred/ src/webapps/job/

Author: cutting
Date: Thu May  3 12:51:14 2007
New Revision: 534975

URL: http://svn.apache.org/viewvc?view=rev&rev=534975
Log:
HADOOP-1144.  Permit one to specify the maximum percentage of tasks that can fail before a job is aborted.  Contributed by Arun.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
    lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Thu May  3 12:51:14 2007
@@ -325,6 +325,10 @@
 96. HADOOP-1322.  Fix TaskTracker blacklisting to work correctly in
     one- and two-node clusters.  (Arun C Murthy via cutting)
 
+97. HADOOP-1144.  Permit one to specify a maximum percentage of tasks
+    that can fail before a job is aborted.  The default is zero.
+    (Arun C Murthy via cutting)
+
 
 Release 0.12.3 - 2007-04-06
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobConf.java Thu May  3 12:51:14 2007
@@ -613,6 +613,50 @@
   public int getMaxTaskFailuresPerTracker() {
     return getInt("mapred.max.tracker.failures", 4); 
   }
+
+  /**
+   * Get the maximum percentage of map tasks that can fail without 
+   * the job being aborted.
+   * 
+   * @return the maximum percentage of map tasks that can fail without
+   *         the job being aborted
+   */
+  public int getMaxMapTaskFailuresPercent() {
+    return getInt("mapred.max.map.failures.percent", 0);
+  }
+
+  /**
+   * Set the maximum percentage of map tasks that can fail without the job
+   * being aborted.
+   * 
+   * @param percent the maximum percentage of map tasks that can fail without 
+   *                the job being aborted
+   */
+  public void setMaxMapTaskFailuresPercent(int percent) {
+    setInt("mapred.max.map.failures.percent", percent);
+  }
+  
+  /**
+   * Get the maximum percentage of reduce tasks that can fail without 
+   * the job being aborted.
+   * 
+   * @return the maximum percentage of reduce tasks that can fail without
+   *         the job being aborted
+   */
+  public int getMaxReduceTaskFailuresPercent() {
+    return getInt("mapred.max.reduce.failures.percent", 0);
+  }
+  
+  /**
+   * Set the maximum percentage of reduce tasks that can fail without the job
+   * being aborted.
+   * 
+   * @param percent the maximum percentage of reduce tasks that can fail without 
+   *                the job being aborted
+   */
+  public void setMaxReduceTaskFailuresPercent(int percent) {
+    setInt("mapred.max.reduce.failures.percent", percent);
+  }
   
   /** Find a jar that contains a class of the same name, if any.
    * It will return a jar file, even if that is not the first thing

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Thu May  3 12:51:14 2007
@@ -63,7 +63,13 @@
   int finishedMapTasks = 0;
   int finishedReduceTasks = 0;
   int failedMapTasks = 0; 
-  int failedReduceTasks = 0; 
+  int failedReduceTasks = 0;
+  
+  int mapFailuresPercent = 0;
+  int reduceFailuresPercent = 0;
+  int failedMapTIPs = 0;
+  int failedReduceTIPs = 0;
+  
   JobTracker jobtracker = null;
   Map<String,List<TaskInProgress>> hostToMaps =
     new HashMap<String,List<TaskInProgress>>();
@@ -91,7 +97,14 @@
 
   private LocalFileSystem localFs;
   private String uniqueString;
-    
+
+  // Per-job counters
+  public static enum Counter { 
+    NUM_FAILED_MAPS, 
+    NUM_FAILED_REDUCES
+  }
+  private Counters jobCounters = new Counters();
+  
   private Counters mapCounters = new Counters();
   private Counters reduceCounters = new Counters();
   private MetricsRecord jobMetrics;
@@ -130,6 +143,9 @@
     this.numReduceTasks = conf.getNumReduceTasks();
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>(
                                                                    numMapTasks + numReduceTasks + 10);
+
+    this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+    this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
         
     JobHistory.JobInfo.logSubmitted(jobid, conf.getJobName(), conf.getUser(), 
                                     System.currentTimeMillis(), jobFile); 
@@ -359,14 +375,6 @@
         tip.setSuccessEventNumber(taskCompletionEventTracker);
       } else if (state == TaskStatus.State.FAILED ||
                  state == TaskStatus.State.KILLED) {
-        taskEvent = new TaskCompletionEvent(
-                                            taskCompletionEventTracker, 
-                                            status.getTaskId(),
-                                            tip.idWithinJob(),
-                                            status.getIsMap(),
-                                            TaskCompletionEvent.Status.FAILED, 
-                                            httpTaskLogLocation
-                                            );
         // Get the event number for the (possibly) previously successful
         // task. If there exists one, then set that status to OBSOLETE 
         int eventNumber;
@@ -376,9 +384,24 @@
           if (t.getTaskId().equals(status.getTaskId()))
             t.setTaskStatus(TaskCompletionEvent.Status.OBSOLETE);
         }
+        
         // Tell the job to fail the relevant task
         failedTask(tip, status.getTaskId(), status, status.getTaskTracker(),
-                   wasRunning, wasComplete);
+                   wasRunning, wasComplete, metrics);
+
+        // Did the task failure lead to tip failure?
+        TaskCompletionEvent.Status taskCompletionStatus = 
+          TaskCompletionEvent.Status.FAILED;
+        if (tip.isFailed()) {
+          taskCompletionStatus = TaskCompletionEvent.Status.TIPFAILED;
+        }
+        taskEvent = new TaskCompletionEvent(taskCompletionEventTracker, 
+                                            status.getTaskId(),
+                                            tip.idWithinJob(),
+                                            status.getIsMap(),
+                                            taskCompletionStatus, 
+                                            httpTaskLogLocation
+                                           );
       }          
 
       // Add the 'complete' task i.e. successful/failed
@@ -411,7 +434,16 @@
       }
     }
   }
-    
+
+  /**
+   * Returns the job-level counters.
+   * 
+   * @return the job-level counters.
+   */
+  public synchronized Counters getJobCounters() {
+    return jobCounters;
+  }
+  
   /**
    *  Returns map phase counters by summing over all map tasks in progress.
    */
@@ -427,11 +459,12 @@
   }
     
   /**
-   *  Returns the total job counters, by adding together the map and the
-   *  reduce counters.
+   *  Returns the total job counters, by adding together the job, 
+   *  the map and the reduce counters.
    */
   public Counters getCounters() {
-    return Counters.sum(getMapCounters(), getReduceCounters());
+    return Counters.sum(getJobCounters(), 
+                        Counters.sum(getMapCounters(), getReduceCounters()));
   }
     
   /**
@@ -741,9 +774,27 @@
     //
     // Figure out whether the Job is done
     //
+    isJobComplete(tip, metrics);
+    
+    if (this.status.getRunState() != JobStatus.RUNNING) {
+      // The job has been killed/failed, 
+      // JobTracker should cleanup this task
+      jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+    }
+  }
+
+  /**
+   * Check if the job is done since all it's component tasks are either
+   * successful or have failed.
+   * 
+   * @param tip the current tip which completed either succesfully or failed
+   * @param metrics job-tracker metrics
+   * @return
+   */
+  private boolean isJobComplete(TaskInProgress tip, JobTrackerMetrics metrics) {
     boolean allDone = true;
     for (int i = 0; i < maps.length; i++) {
-      if (!maps[i].isComplete()) {
+      if (!(maps[i].isComplete() || maps[i].isFailed())) {
         allDone = false;
         break;
       }
@@ -753,7 +804,7 @@
         this.status.setMapProgress(1.0f);              
       }
       for (int i = 0; i < reduces.length; i++) {
-        if (!reduces[i].isComplete()) {
+        if (!(reduces[i].isComplete() || reduces[i].isFailed())) {
           allDone = false;
           break;
         }
@@ -773,13 +824,11 @@
       JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime, 
                                      this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
       metrics.completeJob();
-    } else if (this.status.getRunState() != JobStatus.RUNNING) {
-      // The job has been killed/failed, 
-      // JobTracker should cleanup this task
-      jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+      return true;
     }
+    
+    return false;
   }
-
   /**
    * Kill the job and all its component tasks.
    */
@@ -809,7 +858,7 @@
    * A task assigned to this JobInProgress has reported in as failed.
    * Most of the time, we'll just reschedule execution.  However, after
    * many repeated failures we may instead decide to allow the entire 
-   * job to fail.
+   * job to fail or succeed if the user doesn't care about a few tasks failing.
    *
    * Even if a task has reported as completed in the past, it might later
    * be reported as failed.  That's because the TaskTracker that hosts a map
@@ -819,7 +868,8 @@
    */
   private void failedTask(TaskInProgress tip, String taskid, 
                           TaskStatus status, String trackerName,
-                          boolean wasRunning, boolean wasComplete) {
+                          boolean wasRunning, boolean wasComplete,
+                          JobTrackerMetrics metrics) {
     // Mark the taskid as a 'failure'
     tip.incompleteSubTask(taskid, trackerName);
         
@@ -881,16 +931,45 @@
     jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
 
     //
-    // Check if we need to kill the job because of too many failures
+    // Check if we need to kill the job because of too many failures or 
+    // if the job is complete since all component tasks have completed
     //
     if (tip.isFailed()) {
-      LOG.info("Aborting job " + profile.getJobId());
-      JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
-                                tip.isMapTask() ? Values.MAP.name():Values.REDUCE.name(),  
-                                System.currentTimeMillis(), status.getDiagnosticInfo());
-      JobHistory.JobInfo.logFailed(profile.getJobId(), 
-                                   System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
-      kill();
+      //
+      // Allow upto 'mapFailuresPercent' of map tasks to fail or
+      // 'reduceFailuresPercent' of reduce tasks to fail
+      //
+      boolean killJob = 
+        tip.isMapTask() ? 
+            (((++failedMapTIPs*100)/numMapTasks) > mapFailuresPercent) :
+            (((++failedReduceTIPs*100)/numReduceTasks) > reduceFailuresPercent);
+      
+      if (killJob) {
+        LOG.info("Aborting job " + profile.getJobId());
+        JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(), 
+                                  tip.isMapTask() ? 
+                                          Values.MAP.name() : 
+                                          Values.REDUCE.name(),  
+                                  System.currentTimeMillis(), 
+                                  status.getDiagnosticInfo());
+        JobHistory.JobInfo.logFailed(profile.getJobId(), 
+                                     System.currentTimeMillis(), 
+                                     this.finishedMapTasks, 
+                                     this.finishedReduceTasks
+                                    );
+        kill();
+      } else {
+        isJobComplete(tip, metrics);
+      }
+      
+      //
+      // Update the counters
+      //
+      if (tip.isMapTask()) {
+        jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
+      } else {
+        jobCounters.incrCounter(Counter.NUM_FAILED_REDUCES, 1);
+      }
     }
   }
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/ReduceTask.java Thu May  3 12:51:14 2007
@@ -822,43 +822,39 @@
       copyProgress.start();
       try {
         // loop until we get all required outputs
-        while (numCopied < numOutputs && mergeThrowable == null) {
+        while (!neededOutputs.isEmpty() && mergeThrowable == null) {
           
-          LOG.info(reduceTask.getTaskId() + " Need " + (numOutputs-numCopied) +
-                   " map output(s)");
+          LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
+          " map output(s)");
           
-          if (!neededOutputs.isEmpty()) {
-            LOG.info(reduceTask.getTaskId() + " Need " + neededOutputs.size() +
-                     " map output location(s)");
-            try {
-              // Put the hash entries for the failed fetches. Entries here
-              // might be replaced by (mapId) hashkeys from new successful 
-              // Map executions, if the fetch failures were due to lost tasks.
-              // The replacements, if at all, will happen when we query the
-              // tasktracker and put the mapId hashkeys with new 
-              // MapOutputLocations as values
-              knownOutputs.putAll(retryFetches);
-              // The call getSuccessMapEvents will modify fromEventId to a val
-              // that it should be for the next call to getSuccessMapEvents
-              List <MapOutputLocation> locs = getSuccessMapEvents(fromEventId);
-              
-              // put discovered them on the known list
-              for (int i=0; i < locs.size(); i++) {
-                knownOutputs.put(new Integer(locs.get(i).getMapId()), 
-                                 locs.get(i));
-              }
-              LOG.info(reduceTask.getTaskId() +
-                       " Got " + locs.size() + 
-                       " new map outputs from tasktracker and " + retryFetches.size()
-                       + " map outputs from previous failures");
-              // clear the "failed" fetches hashmap
-              retryFetches.clear();
-            }
-            catch (IOException ie) {
-              LOG.warn(reduceTask.getTaskId() +
-                       " Problem locating map outputs: " +
-                       StringUtils.stringifyException(ie));
+          try {
+            // Put the hash entries for the failed fetches. Entries here
+            // might be replaced by (mapId) hashkeys from new successful 
+            // Map executions, if the fetch failures were due to lost tasks.
+            // The replacements, if at all, will happen when we query the
+            // tasktracker and put the mapId hashkeys with new 
+            // MapOutputLocations as values
+            knownOutputs.putAll(retryFetches);
+            // The call getsMapCompletionEvents will modify fromEventId to a val
+            // that it should be for the next call to getSuccessMapEvents
+            List <MapOutputLocation> locs = getMapCompletionEvents(fromEventId);
+
+            // put discovered them on the known list
+            for (int i=0; i < locs.size(); i++) {
+              knownOutputs.put(new Integer(locs.get(i).getMapId()), 
+                      locs.get(i));
             }
+            LOG.info(reduceTask.getTaskId() +
+                    " Got " + locs.size() + 
+                    " new map outputs from tasktracker and " + retryFetches.size()
+                    + " map outputs from previous failures");
+            // clear the "failed" fetches hashmap
+            retryFetches.clear();
+          }
+          catch (IOException ie) {
+            LOG.warn(reduceTask.getTaskId() +
+                    " Problem locating map outputs: " +
+                    StringUtils.stringifyException(ie));
           }
           
           // now walk through the cache and schedule what we can
@@ -1009,7 +1005,7 @@
             if (inMemClosedFiles.length == 0) {
               LOG.info(reduceTask.getTaskId() + "Nothing to merge from " + 
                        inMemFileSys.getUri());
-              return numCopied == numOutputs;
+              return neededOutputs.isEmpty();
             }
             //name this output file same as the name of the first file that is 
             //there in the current list of inmem files (this is guaranteed to be
@@ -1047,7 +1043,7 @@
             return false;
           }
         }
-        return mergeThrowable == null && numCopied == numOutputs;
+        return mergeThrowable == null && neededOutputs.isEmpty();
       } finally {
         inMemFileSys.close();
         copyProgress.interrupt();
@@ -1077,7 +1073,7 @@
      * @return a set of locations to copy outputs from
      * @throws IOException
      */  
-    private List <MapOutputLocation> getSuccessMapEvents(IntWritable fromEventId)
+    private List <MapOutputLocation> getMapCompletionEvents(IntWritable fromEventId)
       throws IOException {
       
       long currentTime = System.currentTimeMillis();    
@@ -1097,14 +1093,17 @@
       
       List <MapOutputLocation> mapOutputsList = 
         new ArrayList<MapOutputLocation>();
-      for (int i = 0; i < t.length; i++) {
-        if (t[i].getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
-          URI u = URI.create(t[i].getTaskTrackerHttp());
+      for (TaskCompletionEvent event : t) {
+        if (event.getTaskStatus() == TaskCompletionEvent.Status.SUCCEEDED) {
+          URI u = URI.create(event.getTaskTrackerHttp());
           String host = u.getHost();
           int port = u.getPort();
-          String taskId = t[i].getTaskId();
-          int mId = t[i].idWithinJob();
+          String taskId = event.getTaskId();
+          int mId = event.idWithinJob();
           mapOutputsList.add(new MapOutputLocation(taskId, mId, host, port));
+        } else if (event.getTaskStatus() == TaskCompletionEvent.Status.TIPFAILED) {
+          neededOutputs.remove(event.idWithinJob());
+          LOG.info("Ignoring output of failed map: '" + event.getTaskId() + "'");
         }
       }
     

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskCompletionEvent.java Thu May  3 12:51:14 2007
@@ -12,7 +12,7 @@
  *
  */
 public class TaskCompletionEvent implements Writable{
-  static public enum Status {FAILED, SUCCEEDED, OBSOLETE};
+  static public enum Status {FAILED, SUCCEEDED, OBSOLETE, TIPFAILED};
     
   private int eventId; 
   private String taskTrackerHttp;

Modified: lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp?view=diff&rev=534975&r1=534974&r2=534975
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/jobdetails.jsp Thu May  3 12:51:14 2007
@@ -160,7 +160,7 @@
     <%
     Counters mapCounters = job.getMapCounters();
     Counters reduceCounters = job.getReduceCounters();
-    Counters totalCounters = Counters.sum(mapCounters,reduceCounters);
+    Counters totalCounters = job.getCounters();
     
     for (String groupName : totalCounters.getGroupNames()) {
       Counters.Group totalGroup = totalCounters.getGroup(groupName);