You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/08/09 15:48:13 UTC

svn commit: r430052 - in /lucene/hadoop/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java src/java/org/apache/hadoop/mapred/JobTracker.java src/java/org/apache/hadoop/mapred/TaskInProgress.java

Author: cutting
Date: Wed Aug  9 06:48:10 2006
New Revision: 430052

URL: http://svn.apache.org/viewvc?rev=430052&view=rev
Log:
HADOOP-400.  Improvements to task assignment.

Modified:
    lucene/hadoop/trunk/CHANGES.txt
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
    lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java

Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Aug  9 06:48:10 2006
@@ -11,6 +11,11 @@
     Solaris.  This was causing nightly builds to fail.
     (Michel Tourn via cutting)
 
+ 3. HADOOP-400.  Improvements to task assignment.  Tasks are no longer
+    re-run on nodes where they have failed (unless no other node is
+    available).  Also, tasks are better load-balanced among nodes.
+    (omalley via cutting)
+
 
 Release 0.5.0 - 2006-08-04
 

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Aug  9 06:48:10 2006
@@ -311,167 +311,132 @@
     /**
      * Return a MapTask, if appropriate, to run on the given tasktracker
      */
-    public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) {
+    public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) {
+      if (! tasksInited) {
+        LOG.info("Cannot create task split for " + profile.getJobId());
+        return null;
+      }
+      ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
+      double avgProgress = status.mapProgress() / maps.length;
+      int target = findNewTask(tts, clusterSize, avgProgress, 
+                                  maps, firstMapToTry, mapCache);
+      if (target == -1) {
+        return null;
+      }
+      boolean wasRunning = maps[target].isRunning();
+      Task result = maps[target].getTaskToRun(tts.getTrackerName());
+      if (!wasRunning) {
+        runningMapTasks += 1;
+      }
+      return result;
+    }    
+
+    /**
+     * Return a ReduceTask, if appropriate, to run on the given tasktracker.
+     * We don't have cache-sensitivity for reduce tasks, as they
+     *  work on temporary MapRed files.  
+     */
+    public Task obtainNewReduceTask(TaskTrackerStatus tts,
+                                    int clusterSize) {
         if (! tasksInited) {
             LOG.info("Cannot create task split for " + profile.getJobId());
             return null;
         }
 
-        Task t = null;
-        int cacheTarget = -1;
-        int stdTarget = -1;
-        int specTarget = -1;
-        int failedTarget = -1;
-
-        //
-        // We end up creating two tasks for the same bucket, because
-        // we call obtainNewMapTask() really fast, twice in a row.
-        // There's not enough time for the "recentTasks"
-        //
-
-        //
-        // Compute avg progress through the map tasks
-        //
-        double avgProgress = status.mapProgress() / maps.length;
-
+        double avgProgress = status.reduceProgress() / reduces.length;
+        int target = findNewTask(tts, clusterSize, avgProgress, 
+                                    reduces, firstReduceToTry, null);
+        if (target == -1) {
+          return null;
+        }
+        boolean wasRunning = reduces[target].isRunning();
+        Task result = reduces[target].getTaskToRun(tts.getTrackerName());
+        if (!wasRunning) {
+          runningReduceTasks += 1;
+        }
+        return result;
+    }
+    
+    /**
+     * Find a new task to run.
+     * @param tts The task tracker that is asking for a task
+     * @param clusterSize The number of task trackers in the cluster
+     * @param avgProgress The average progress of this kind of task in this job
+     * @param tasks The list of potential tasks to try
+     * @param firstTaskToTry The first index in tasks to check
+     * @param cachedTasks A list of tasks that would like to run on this node
+     * @return the index in tasks of the selected task (or -1 for no task)
+     */
+    private int findNewTask(TaskTrackerStatus tts, 
+                            int clusterSize,
+                            double avgProgress,
+                            TaskInProgress[] tasks,
+                            int firstTaskToTry,
+                            List cachedTasks) {
+        String taskTracker = tts.getTrackerName();
         //
         // See if there is a split over a block that is stored on
         // the TaskTracker checking in.  That means the block
         // doesn't have to be transmitted from another node.
         //
-        ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost());
-        if (hostMaps != null) {
-          Iterator i = hostMaps.iterator();
+        if (cachedTasks != null) {
+          Iterator i = cachedTasks.iterator();
           while (i.hasNext()) {
             TaskInProgress tip = (TaskInProgress)i.next();
-            if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) {
-              LOG.info("Found task with local split for "+tts.getHost());
-              cacheTarget = tip.getIdWithinJob();
-              i.remove();
-              break;
+            i.remove();
+            if (tip.isRunnable() && 
+                !tip.isRunning() &&
+                !tip.hasFailedOnMachine(taskTracker)) {
+              LOG.info("Choosing cached task " + tip.getTIPId());
+              int cacheTarget = tip.getIdWithinJob();
+              return cacheTarget;
             }
           }
         }
 
+
         //
         // If there's no cached target, see if there's
         // a std. task to run.
         //
-        if (cacheTarget < 0) {
-            for (int i = 0; i < maps.length; i++) {
-                int realIdx = (i + firstMapToTry) % maps.length; 
-                if (maps[realIdx].hasTask()) {
-                    if (stdTarget < 0) {
-                      if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
-                        if (failedTarget < 0) {
-                          failedTarget = realIdx;
-                        }
-                      } else {
-                        stdTarget = realIdx;
-                        break;
-                      }
-                    }
-                }
-            }
-        }
-
-        //
-        // If no cached-target and no std target, see if
-        // there's a speculative task to run.
-        //
-        if (cacheTarget < 0 && stdTarget < 0) {
-            for (int i = 0; i < maps.length; i++) {        
-                int realIdx = (i + firstMapToTry) % maps.length; 
-                if (maps[realIdx].hasSpeculativeTask(avgProgress)) {
-                      if (!maps[realIdx].hasFailedOnMachine(taskTracker)) {
-                        specTarget = realIdx;
-                        break;
-                      }
-                }
-            }
-        }
-
-        //
-        // Run whatever we found
-        //
-        if (cacheTarget >= 0) {
-            t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            runningMapTasks += 1;
-        } else if (stdTarget >= 0) {
-            t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            runningMapTasks += 1;
-	} else if (specTarget >= 0) {
-	    //should always be true, but being paranoid
-            boolean isRunning = maps[specTarget].isRunning(); 
-            t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            if (!isRunning){
-                runningMapTasks += 1;
-            }
-        } else if (failedTarget >= 0) {
-           //should always be false, but being paranoid again
-            boolean isRunning = maps[failedTarget].isRunning(); 
-            t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            if (!isRunning) {
-                runningMapTasks += 1;
-	    }
-        }
-        return t;
-    }
-
-    /**
-     * Return a ReduceTask, if appropriate, to run on the given tasktracker.
-     * We don't have cache-sensitivity for reduce tasks, as they
-     *  work on temporary MapRed files.  
-     */
-    public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) {
-        if (! tasksInited) {
-            LOG.info("Cannot create task split for " + profile.getJobId());
-            return null;
-        }
-
-        Task t = null;
-        int stdTarget = -1;
-        int specTarget = -1;
         int failedTarget = -1;
-        double avgProgress = status.reduceProgress() / reduces.length;
-
-        for (int i = 0; i < reduces.length; i++) {
-            int realIdx = (i + firstReduceToTry) % reduces.length;
-            if (reduces[realIdx].hasTask()) {
-                if (reduces[realIdx].hasFailedOnMachine(taskTracker)) {
-                  if (failedTarget < 0) {
-                    failedTarget = realIdx;
-                  }
-                } else if (stdTarget < 0) {
-                    stdTarget = realIdx;
-                }
-            } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) {
-                if (specTarget < 0 &&
-                    !reduces[realIdx].hasFailedOnMachine(taskTracker)) {
-                    specTarget = realIdx;
-                }
+        int specTarget = -1;
+        for (int i = 0; i < tasks.length; i++) {
+          int realIdx = (i + firstTaskToTry) % tasks.length; 
+          TaskInProgress task = tasks[realIdx];
+          if (task.isRunnable()) {
+            // if it failed here and we haven't tried every machine, we
+            // don't schedule it here.
+            boolean hasFailed = task.hasFailedOnMachine(taskTracker);
+            if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
+              continue;
+            }
+            boolean isRunning = task.isRunning();
+            if (hasFailed) {
+              // failed tasks that aren't running can be scheduled as a last
+              // resort
+              if (!isRunning && failedTarget == -1) {
+                failedTarget = realIdx;
+              }
+            } else {
+              if (!isRunning) {
+                LOG.info("Choosing normal task " + tasks[realIdx].getTIPId());
+                return realIdx;
+              } else if (specTarget == -1 &&
+                         task.hasSpeculativeTask(avgProgress)) {
+                specTarget = realIdx;
+              }
             }
+          }
         }
-        
-        if (stdTarget >= 0) {
-            t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            runningReduceTasks += 1;
-	} else if (specTarget >= 0) {
-            //should be false
-            boolean isRunning = reduces[specTarget].isRunning();
-            t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
-            if (!isRunning){
-               runningReduceTasks += 1;
-            }
-        } else if (failedTarget >= 0) {
-            boolean isRunning = reduces[failedTarget].isRunning();
-            t = reduces[failedTarget].getTaskToRun(taskTracker, tts, 
-                                                   avgProgress);
-            if (!isRunning){
-                runningReduceTasks += 1;
-            }
+        if (specTarget != -1) {
+          LOG.info("Choosing speculative task " + 
+                    tasks[specTarget].getTIPId());
+        } else if (failedTarget != -1) {
+          LOG.info("Choosing failed task " + 
+                    tasks[failedTarget].getTIPId());          
         }
-        return t;
+        return specTarget != -1 ? specTarget : failedTarget;
     }
 
     /**

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Aug  9 06:48:10 2006
@@ -779,13 +779,17 @@
         int remainingMapLoad = 0;
         int numTaskTrackers;
         TaskTrackerStatus tts;
-        int avgMapLoad = 0;
-        int avgReduceLoad = 0;
 	
         synchronized (taskTrackers) {
           numTaskTrackers = taskTrackers.size();
           tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
         }
+        if (tts == null) {
+          LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
+          return null;
+        }
+        int totalCapacity = numTaskTrackers * maxCurrentTasks;
+
         synchronized(jobsByArrival){
             for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
                     JobInProgress job = (JobInProgress) it.next();
@@ -797,19 +801,23 @@
                     }
             }   
         }
-        
+
+        // find out the maximum number of maps or reduces that we are willing
+        // to run on any node.
+        int maxMapLoad = 0;
+        int maxReduceLoad = 0;
         if (numTaskTrackers > 0) {
-          avgMapLoad = remainingMapLoad / numTaskTrackers;
-          avgReduceLoad = remainingReduceLoad / numTaskTrackers;
+          maxMapLoad = Math.min(maxCurrentTasks,
+                                (int) Math.ceil((double) remainingMapLoad / 
+                                                numTaskTrackers));
+          maxReduceLoad = Math.min(maxCurrentTasks,
+                                   (int) Math.ceil((double) remainingReduceLoad
+                                                   / numTaskTrackers));
         }
-        int totalCapacity = numTaskTrackers * maxCurrentTasks;
+        
         //
         // Get map + reduce counts for the current tracker.
         //
-        if (tts == null) {
-          LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
-          return null;
-        }
 
         int numMaps = tts.countMapTasks();
         int numReduces = tts.countReduceTasks();
@@ -823,18 +831,12 @@
 
         //
         // We hand a task to the current taskTracker if the given machine 
-        // has a workload that's equal to or less than the pendingMaps average.
-        // This way the maps are launched if the TaskTracker has running tasks 
-        // less than the pending average 
-        // +/- TASK_ALLOC_EPSILON.  (That epsilon is in place in case
-        // there is an odd machine that is failing for some reason but 
-        // has not yet been removed from the pool, making capacity seem
-        // larger than it really is.)
+        // has a workload that's less than the maximum load of that kind of
+        // task.
         //
        
         synchronized (jobsByArrival) {
-            if ((numMaps < maxCurrentTasks) &&
-                (numMaps <= avgMapLoad + 1 + TASK_ALLOC_EPSILON)) {
+            if (numMaps < maxMapLoad) {
 
                 int totalNeededMaps = 0;
                 for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
@@ -843,7 +845,7 @@
                         continue;
                     }
 
-                    Task t = job.obtainNewMapTask(taskTracker, tts);
+                    Task t = job.obtainNewMapTask(tts, numTaskTrackers);
                     if (t != null) {
                       expireLaunchingTasks.addNewTask(t.getTaskId());
                       myMetrics.launchMap();
@@ -870,17 +872,17 @@
             //
             // Same thing, but for reduce tasks
             //
-            if ((numReduces < maxCurrentTasks) &&
-                (numReduces <= avgReduceLoad + 1 + TASK_ALLOC_EPSILON)) {
+            if (numReduces < maxReduceLoad) {
 
                 int totalNeededReduces = 0;
                 for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
                     JobInProgress job = (JobInProgress) it.next();
-                    if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+                    if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+                        job.numReduceTasks == 0) {
                         continue;
                     }
 
-                    Task t = job.obtainNewReduceTask(taskTracker, tts);
+                    Task t = job.obtainNewReduceTask(tts, numTaskTrackers);
                     if (t != null) {
                       expireLaunchingTasks.addNewTask(t.getTaskId());
                       myMetrics.launchReduce();

Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Aug  9 06:48:10 2006
@@ -16,8 +16,10 @@
 package org.apache.hadoop.mapred;
 
 import org.apache.commons.logging.*;
+import org.apache.hadoop.util.*;
 
 import java.text.NumberFormat;
+import java.io.*;
 import java.util.*;
 
 
@@ -391,21 +393,12 @@
     /////////////////////////////////////////////////
 
     /**
-     * Return whether this TIP has a non-speculative task to run
+     * Return whether this TIP still needs to run
      */
-    boolean hasTask() {
-        if (failed || isComplete() || recentTasks.size() > 0) {
-            return false;
-        } else {
-            for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) {
-                TaskStatus ts = (TaskStatus) it.next();
-                if (ts.getRunState() == TaskStatus.RUNNING) {
-                    return false;
-                }
-            }
-            return true;
-        }
+    boolean isRunnable() {
+      return !failed && (completes == 0);
     }
+    
     /**
      * Return whether the TIP has a speculative task to run.  We
      * only launch a speculative task if the current TIP is really
@@ -430,27 +423,24 @@
     /**
      * Return a Task that can be sent to a TaskTracker for execution.
      */
-    public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) {
+    public Task getTaskToRun(String taskTracker) {
         Task t = null;
-        if (hasTask() || 
-            hasSpeculativeTask(avgProgress)) {
-
-            String taskid = (String) usableTaskIds.first();
-            usableTaskIds.remove(taskid);
-            String jobId = job.getProfile().getJobId();
-
-            if (isMapTask()) {
-                t = new MapTask(jobId, jobFile, taskid, partition, split);
-            } else {
-                t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
-            }
-            t.setConf(conf);
 
-            recentTasks.add(taskid);
+        String taskid = (String) usableTaskIds.first();
+        usableTaskIds.remove(taskid);
+        String jobId = job.getProfile().getJobId();
 
-            // Ask JobTracker to note that the task exists
-            jobtracker.createTaskEntry(taskid, taskTracker, this);
+        if (isMapTask()) {
+          t = new MapTask(jobId, jobFile, taskid, partition, split);
+        } else {
+          t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
         }
+        t.setConf(conf);
+
+        recentTasks.add(taskid);
+
+        // Ask JobTracker to note that the task exists
+        jobtracker.createTaskEntry(taskid, taskTracker, this);
         return t;
     }
     
@@ -461,6 +451,14 @@
      */
     public boolean hasFailedOnMachine(String tracker) {
       return machinesWhereFailed.contains(tracker);
+    }
+    
+    /**
+     * Get the number of machines where this task has failed.
+     * @return the size of the failed machine set
+     */
+    public int getNumberOfFailedMachines() {
+      return machinesWhereFailed.size();
     }
     
     /**