You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/12 11:49:49 UTC

svn commit: r953976 - in /hadoop/mapreduce/trunk: CHANGES.txt src/java/org/apache/hadoop/mapred/JobInProgress.java

Author: vinodkv
Date: Sat Jun 12 09:49:48 2010
New Revision: 953976

URL: http://svn.apache.org/viewvc?rev=953976&view=rev
Log:
MAPREDUCE-1829. JobInProgress.findSpeculativeTask should use min() to find the candidate instead of sort(). Contributed by Scott Chen.

Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=953976&r1=953975&r2=953976&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Jun 12 09:49:48 2010
@@ -59,6 +59,9 @@ Trunk (unreleased changes)
     MAPREDUCE-1354. Enhancements to JobTracker for better performance and
     scalability. (Arun C. Murthy & Richard King via acmurthy) 
 
+    MAPREDUCE-1829. JobInProgress.findSpeculativeTask should use min() to
+    find the candidate instead of sort(). (Scott Chen via vinodkv)
+
   BUG FIXES
 
     MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.

Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=953976&r1=953975&r2=953976&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Jun 12 09:49:48 2010
@@ -2109,36 +2109,45 @@ public class JobInProgress {
       return null;
     }
     long now = JobTracker.getClock().getTime();
+
+    // Don't return anything if either the TaskTracker is slow or we have
+    // already launched enough speculative tasks in the cluster.
     if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
       return null;
     }
-    // List of speculatable candidates, start with all, and chop it down
-    ArrayList<TaskInProgress> candidates = new ArrayList<TaskInProgress>(list);
-    
-    Iterator<TaskInProgress> iter = candidates.iterator();
+
+    TaskInProgress slowestTIP = null;
+    Comparator<TaskInProgress> LateComparator =
+      new EstimatedTimeLeftComparator(now);
+
+    Iterator<TaskInProgress> iter = list.iterator();
     while (iter.hasNext()) {
       TaskInProgress tip = iter.next();
+
+      // If this tip has already run on this machine once or it doesn't need any
+      // more speculative attempts, skip it.
       if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
           !tip.canBeSpeculated(now)) {
-          //remove it from candidates
-          iter.remove();
+          continue;
+      }
+
+      if (slowestTIP == null) {
+        slowestTIP = tip;
+      } else {
+        slowestTIP =
+            LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
       }
     }
-    //resort according to expected time till completion
-    Comparator<TaskInProgress> LateComparator = 
-      new EstimatedTimeLeftComparator(now);
-    Collections.sort(candidates, LateComparator);
-    if (candidates.size() > 0 ) {
-      TaskInProgress tip = candidates.get(0);
+
+    if (slowestTIP != null) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " +
-            tip.getCurrentProgressRate(now) + " Job's : " + 
-            (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+        LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
+            slowestTIP.getCurrentProgressRate(now) + " Job's : " + 
+            (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
       }
-      return tip;
-    } else {
-      return null;
     }
+
+  return slowestTIP;
   }
 
   /**