You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:30:45 UTC

svn commit: r1077575 - in /hadoop/common/branches/branch-0.20-security-patches/src: core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java mapred/org/apache/hadoop/mapred/JobHistory.java mapred/org/apache/hadoop/mapred/JobInProgress.java

Author: omalley
Date: Fri Mar  4 04:30:45 2011
New Revision: 1077575

URL: http://svn.apache.org/viewvc?rev=1077575&view=rev
Log:
commit 2eacb066e6fb867db147ea2d488a2b78d9a7817e
Author: Chris Douglas <cd...@apache.org>
Date:   Tue Jul 20 23:53:23 2010 -0700

    MAPREDUCE:339 from https://issues.apache.org/jira/secure/attachment/12450028/M339-0y20s.patch
    
    +++ b/YAHOO-CHANGES.txt
    +    MAPREDUCE-339. Greedily schedule failed tasks to cause early job failure.
    +    (cdouglas)
    +

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/core/org/apache/hadoop/security/RefreshUserMappingsProtocol.java Fri Mar  4 04:30:45 2011
@@ -38,7 +38,6 @@ public interface RefreshUserMappingsProt
 
   /**
    * Refresh user to group mappings.
-   * @param conf
    * @throws IOException
    */
   public void refreshUserToGroupsMappings() throws IOException;

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobHistory.java Fri Mar  4 04:30:45 2011
@@ -486,8 +486,6 @@ public class JobHistory {
    * @param conf Jobconf of the job tracker.
    * @param hostname jobtracker's hostname
    * @param jobTrackerStartTime jobtracker's start time
-   * @return true if intialized properly
-   *         false otherwise
    */
   public static void init(JobTracker jobTracker, JobConf conf,
              String hostname, long jobTrackerStartTime) throws IOException {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077575&r1=1077574&r2=1077575&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:30:45 2011
@@ -21,6 +21,7 @@ import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Comparator;
 import java.util.EnumMap;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -31,6 +32,7 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.SortedSet;
 import java.util.TreeMap;
 import java.util.TreeSet;
 import java.util.Vector;
@@ -79,8 +81,8 @@ public class JobInProgress {
   /**
    * Used when the a kill is issued to a job which is initializing.
    */
+  @SuppressWarnings("serial")
   static class KillInterruptedException extends InterruptedException {
-   private static final long serialVersionUID = 1L;
     public KillInterruptedException(String msg) {
       super(msg);
     }
@@ -122,8 +124,8 @@ public class JobInProgress {
   int speculativeMapTasks = 0;
   int speculativeReduceTasks = 0;
   
-  int mapFailuresPercent = 0;
-  int reduceFailuresPercent = 0;
+  final int mapFailuresPercent;
+  final int reduceFailuresPercent;
   int failedMapTIPs = 0;
   int failedReduceTIPs = 0;
   private volatile boolean launchedCleanup = false;
@@ -142,14 +144,17 @@ public class JobInProgress {
   // Map of NetworkTopology Node to set of running TIPs
   Map<Node, Set<TaskInProgress>> runningMapCache;
 
-  // A list of non-local non-running maps
-  List<TaskInProgress> nonLocalMaps;
+  // A list of non-local, non-running maps
+  final List<TaskInProgress> nonLocalMaps;
+
+  // Set of failed, non-running maps sorted by #failures
+  final SortedSet<TaskInProgress> failedMaps;
 
   // A set of non-local running maps
   Set<TaskInProgress> nonLocalRunningMaps;
 
   // A list of non-running reduce TIPs
-  List<TaskInProgress> nonRunningReduces;
+  Set<TaskInProgress> nonRunningReduces;
 
   // A set of running reduce TIPs
   Set<TaskInProgress> runningReduces;
@@ -160,6 +165,16 @@ public class JobInProgress {
   // A list of cleanup tasks for the reduce task attempts, to be launched
   List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
 
+  // keep failedMaps, nonRunningReduces ordered by failure count to bias
+  // scheduling toward failing tasks
+  private static final Comparator<TaskInProgress> failComparator =
+    new Comparator<TaskInProgress>() {
+      @Override
+      public int compare(TaskInProgress t1, TaskInProgress t2) {
+        return t2.numTaskFailures() - t1.numTaskFailures();
+      }
+    };
+
   private final int maxLevel;
 
   /**
@@ -223,7 +238,7 @@ public class JobInProgress {
   private final int restartCount;
 
   private JobConf conf;
-  AtomicBoolean tasksInited = new AtomicBoolean(false);
+  volatile boolean tasksInited = false;
   private JobInitKillStatus jobInitKillStatus = new JobInitKillStatus();
 
   private LocalFileSystem localFs;
@@ -315,9 +330,10 @@ public class JobInProgress {
     hasSpeculativeMaps = conf.getMapSpeculativeExecution();
     hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
     this.nonLocalMaps = new LinkedList<TaskInProgress>();
+    this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
     this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
     this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-    this.nonRunningReduces = new LinkedList<TaskInProgress>();
+    this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
     this.runningReduces = new LinkedHashSet<TaskInProgress>();
     this.resourceEstimator = new ResourceEstimator(this);
     this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
@@ -327,6 +343,8 @@ public class JobInProgress {
     this.memoryPerMap = conf.getMemoryForMapTask();
     this.memoryPerReduce = conf.getMemoryForReduceTask();
     this.maxTaskFailuresPerTracker = conf.getMaxTaskFailuresPerTracker();
+    this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
+    this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
 
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
@@ -427,9 +445,10 @@ public class JobInProgress {
       this.maxLevel = jobtracker.getNumTaskCacheLevels();
       this.anyCacheLevel = this.maxLevel+1;
       this.nonLocalMaps = new LinkedList<TaskInProgress>();
+      this.failedMaps = new TreeSet<TaskInProgress>(failComparator);
       this.nonLocalRunningMaps = new LinkedHashSet<TaskInProgress>();
       this.runningMapCache = new IdentityHashMap<Node, Set<TaskInProgress>>();
-      this.nonRunningReduces = new LinkedList<TaskInProgress>();    
+      this.nonRunningReduces = new TreeSet<TaskInProgress>(failComparator);
       this.runningReduces = new LinkedHashSet<TaskInProgress>();
       this.resourceEstimator = new ResourceEstimator(this);
       this.reduce_input_limit = conf.getLong("mapreduce.reduce.input.limit", 
@@ -541,7 +560,7 @@ public class JobInProgress {
    *         <code>false</code> otherwise
    */
   public boolean inited() {
-    return tasksInited.get();
+    return tasksInited;
   }
  
   /**
@@ -612,7 +631,7 @@ public class JobInProgress {
    */
   public synchronized void initTasks() 
   throws IOException, KillInterruptedException {
-    if (tasksInited.get() || isComplete()) {
+    if (tasksInited || isComplete()) {
       return;
     }
     synchronized(jobInitKillStatus){
@@ -740,7 +759,7 @@ public class JobInProgress {
       }
     }
     
-    tasksInited.set(true);
+    tasksInited = true;
     JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime, 
                                  numMapTasks, numReduceTasks);
     
@@ -748,7 +767,7 @@ public class JobInProgress {
    LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
             + " map tasks and " + numReduceTasks + " reduce tasks.");
   }
-  
+
   TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
   throws IOException {
     TaskSplitMetaInfo[] allTaskSplitMetaInfo =
@@ -1267,7 +1286,7 @@ public class JobInProgress {
   public Task obtainTaskCleanupTask(TaskTrackerStatus tts, 
                                                  boolean isMapSlot)
   throws IOException {
-    if (!tasksInited.get()) {
+    if (!tasksInited) {
       return null;
     }
     synchronized (this) {
@@ -1303,7 +1322,7 @@ public class JobInProgress {
                                                      int clusterSize, 
                                                      int numUniqueHosts)
   throws IOException {
-    if (!tasksInited.get()) {
+    if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       try { throw new IOException("state = " + status.getRunState()); }
       catch (IOException ioe) {ioe.printStackTrace();}
@@ -1329,7 +1348,7 @@ public class JobInProgress {
                                                     int clusterSize, 
                                                     int numUniqueHosts)
   throws IOException {
-    if (!tasksInited.get()) {
+    if (!tasksInited) {
       LOG.info("Cannot create task split for " + profile.getJobID());
       try { throw new IOException("state = " + status.getRunState()); }
       catch (IOException ioe) {ioe.printStackTrace();}
@@ -1370,13 +1389,12 @@ public class JobInProgress {
   
   /**
    * Check if we can schedule an off-switch task for this job.
+   * @param numTaskTrackers TaskTrackers in the cluster.
    * @param numTaskTrackers number of tasktrackers
-   * 
-   * We check the number of missed opportunities for the job. 
-   * If it has 'waited' long enough we go ahead and schedule.
-   * 
    * @return <code>true</code> if we can schedule off-switch, 
    *         <code>false</code> otherwise
+   * We check the number of missed opportunities for the job. 
+   * If it has 'waited' long enough we go ahead and schedule.
    */
   public boolean scheduleOffSwitch(int numTaskTrackers) {
     long missedTaskTrackers = getNumSchedulingOpportunities();
@@ -1395,7 +1413,7 @@ public class JobInProgress {
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if(!tasksInited.get()) {
+    if(!tasksInited) {
       return null;
     }
     
@@ -1487,7 +1505,7 @@ public class JobInProgress {
                                              int numUniqueHosts,
                                              boolean isMapSlot
                                             ) throws IOException {
-    if(!tasksInited.get()) {
+    if(!tasksInited) {
       return null;
     }
     
@@ -1537,7 +1555,7 @@ public class JobInProgress {
    * @return true/false
    */
   private synchronized boolean canLaunchSetupTask() {
-    return (tasksInited.get() && status.getRunState() == JobStatus.PREP && 
+    return (tasksInited && status.getRunState() == JobStatus.PREP && 
            !launchedSetup && !jobKilled && !jobFailed);
   }
   
@@ -2002,37 +2020,14 @@ public class JobInProgress {
    * @param tip the tip that needs to be failed
    */
   private synchronized void failMap(TaskInProgress tip) {
-    if (nonRunningMapCache == null) {
-      LOG.warn("Non-running cache for maps missing!! "
-               + "Job details are missing.");
-      return;
-    }
-
-    // 1. Its added everywhere since other nodes (having this split local)
-    //    might have removed this tip from their local cache
-    // 2. Give high priority to failed tip - fail early
-
-    String[] splitLocations = tip.getSplitLocations();
-
-    // Add the TIP in the front of the list for non-local non-running maps
-    if (splitLocations == null || splitLocations.length == 0) {
-      nonLocalMaps.add(0, tip);
+    if (failedMaps == null) {
+      LOG.warn("Failed cache for maps is missing! Job details are missing.");
       return;
     }
 
-    for(String host: splitLocations) {
-      Node node = jobtracker.getNode(host);
-      
-      for (int j = 0; j < maxLevel; ++j) {
-        List<TaskInProgress> hostMaps = nonRunningMapCache.get(node);
-        if (hostMaps == null) {
-          hostMaps = new LinkedList<TaskInProgress>();
-          nonRunningMapCache.put(node, hostMaps);
-        }
-        hostMaps.add(0, tip);
-        node = node.getParent();
-      }
-    }
+    // Ignore locality for subsequent scheduling on this TIP. Always schedule
+    // it ahead of other tasks.
+    failedMaps.add(tip);
   }
   
   /**
@@ -2045,7 +2040,7 @@ public class JobInProgress {
                + "Job details are missing.");
       return;
     }
-    nonRunningReduces.add(0, tip);
+    nonRunningReduces.add(tip);
   }
   
   /**
@@ -2189,24 +2184,33 @@ public class JobInProgress {
     }
     
     
-    // For scheduling a map task, we have two caches and a list (optional)
-    //  I)   one for non-running task
-    //  II)  one for running task (this is for handling speculation)
-    //  III) a list of TIPs that have empty locations (e.g., dummy splits),
-    //       the list is empty if all TIPs have associated locations
+    // When scheduling a map task:
+    //  0) Schedule a failed task without considering locality
+    //  1) Schedule non-running tasks
+    //  2) Schedule speculative tasks
+    //  3) Schedule tasks with no location information
 
     // First a look up is done on the non-running cache and on a miss, a look 
     // up is done on the running cache. The order for lookup within the cache:
     //   1. from local node to root [bottom up]
     //   2. breadth wise for all the parent nodes at max level
-
-    // We fall to linear scan of the list (III above) if we have misses in the 
+    // We fall to linear scan of the list ((3) above) if we have misses in the 
     // above caches
 
+    // 0) Schedule the task with the most failures, unless failure was on this
+    //    machine
+    tip = findTaskFromList(failedMaps, tts, numUniqueHosts, false);
+    if (tip != null) {
+      // Add to the running list
+      scheduleMap(tip);
+      LOG.info("Choosing a failed task " + tip.getTIPId());
+      return tip.getIdWithinJob();
+    }
+
     Node node = jobtracker.getNode(tts.getHost());
     
     //
-    // I) Non-running TIP :
+    // 1) Non-running TIP :
     // 
 
     // 1. check from local node to the root [bottom up cache lookup]
@@ -2216,10 +2220,10 @@ public class JobInProgress {
       Node key = node;
       int level = 0;
       // maxCacheLevel might be greater than this.maxLevel if findNewMapTask is
-      // called to schedule any task (local, rack-local, off-switch or speculative)
-      // tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if findNewMapTask is
-      //  (i.e. -1) if findNewMapTask is to only schedule off-switch/speculative
-      // tasks
+      // called to schedule any task (local, rack-local, off-switch or
+      // speculative) tasks or it might be NON_LOCAL_CACHE_LEVEL (i.e. -1) if
+      // findNewMapTask is (i.e. -1) if findNewMapTask is to only schedule
+      // off-switch/speculative tasks
       int maxLevelToSchedule = Math.min(maxCacheLevel, maxLevel);
       for (level = 0;level < maxLevelToSchedule; ++level) {
         List <TaskInProgress> cacheForLevel = nonRunningMapCache.get(key);
@@ -2295,7 +2299,7 @@ public class JobInProgress {
     }
 
     //
-    // II) Running TIP :
+    // 2) Running TIP :
     // 
  
     if (hasSpeculativeMaps) {
@@ -2702,7 +2706,7 @@ public class JobInProgress {
    * @param jobTerminationState job termination state
    */
   private synchronized void terminate(int jobTerminationState) {
-    if(!tasksInited.get()) {
+    if(!tasksInited) {
     	//init could not be done, we just terminate directly.
       terminateJob(jobTerminationState);
       return;
@@ -3088,6 +3092,7 @@ public class JobInProgress {
 
       cleanUpMetrics();
       // free up the memory used by the data structures
+      this.failedMaps.clear();
       this.nonRunningMapCache = null;
       this.runningMapCache = null;
       this.nonRunningReduces = null;
@@ -3184,9 +3189,7 @@ public class JobInProgress {
     
     float failureRate = (float)fetchFailures / runningReduceTasks;
     // declare faulty if fetch-failures >= max-allowed-failures
-    boolean isMapFaulty = (failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT) 
-                          ? true
-                          : false;
+    boolean isMapFaulty = failureRate >= MAX_ALLOWED_FETCH_FAILURES_PERCENT;
     if (fetchFailures >= MAX_FETCH_FAILURES_NOTIFICATIONS
         && isMapFaulty) {
       LOG.info("Too many fetch-failures for output of task: " + mapTaskId