You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by vi...@apache.org on 2010/06/12 11:49:49 UTC
svn commit: r953976 - in /hadoop/mapreduce/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
Author: vinodkv
Date: Sat Jun 12 09:49:48 2010
New Revision: 953976
URL: http://svn.apache.org/viewvc?rev=953976&view=rev
Log:
MAPREDUCE-1829. JobInProgress.findSpeculativeTask should use min() to find the candidate instead of sort(). Contributed by Scott Chen.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=953976&r1=953975&r2=953976&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Sat Jun 12 09:49:48 2010
@@ -59,6 +59,9 @@ Trunk (unreleased changes)
MAPREDUCE-1354. Enhancements to JobTracker for better performance and
scalability. (Arun C. Murthy & Richard King via acmurthy)
+ MAPREDUCE-1829. JobInProgress.findSpeculativeTask should use min() to
+ find the candidate instead of sort(). (Scott Chen via vinodkv)
+
BUG FIXES
MAPREDUCE-1707. TaskRunner can get NPE in getting ugi from TaskTracker.
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=953976&r1=953975&r2=953976&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Jun 12 09:49:48 2010
@@ -2109,36 +2109,45 @@ public class JobInProgress {
return null;
}
long now = JobTracker.getClock().getTime();
+
+ // Don't return anything if either the TaskTracker is slow or we have
+ // already launched enough speculative tasks in the cluster.
if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
return null;
}
- // List of speculatable candidates, start with all, and chop it down
- ArrayList<TaskInProgress> candidates = new ArrayList<TaskInProgress>(list);
-
- Iterator<TaskInProgress> iter = candidates.iterator();
+
+ TaskInProgress slowestTIP = null;
+ Comparator<TaskInProgress> LateComparator =
+ new EstimatedTimeLeftComparator(now);
+
+ Iterator<TaskInProgress> iter = list.iterator();
while (iter.hasNext()) {
TaskInProgress tip = iter.next();
+
+ // If this tip has already run on this machine once or it doesn't need any
+ // more speculative attempts, skip it.
if (tip.hasRunOnMachine(taskTrackerHost, taskTrackerName) ||
!tip.canBeSpeculated(now)) {
- //remove it from candidates
- iter.remove();
+ continue;
+ }
+
+ if (slowestTIP == null) {
+ slowestTIP = tip;
+ } else {
+ slowestTIP =
+ LateComparator.compare(tip, slowestTIP) < 0 ? tip : slowestTIP;
}
}
- //resort according to expected time till completion
- Comparator<TaskInProgress> LateComparator =
- new EstimatedTimeLeftComparator(now);
- Collections.sort(candidates, LateComparator);
- if (candidates.size() > 0 ) {
- TaskInProgress tip = candidates.get(0);
+
+ if (slowestTIP != null) {
if (LOG.isDebugEnabled()) {
- LOG.debug("Chose task " + tip.getTIPId() + ". Statistics: Task's : " +
- tip.getCurrentProgressRate(now) + " Job's : " +
- (tip.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
+ LOG.debug("Chose task " + slowestTIP.getTIPId() + ". Statistics: Task's : " +
+ slowestTIP.getCurrentProgressRate(now) + " Job's : " +
+ (slowestTIP.isMapTask() ? runningMapTaskStats : runningReduceTaskStats));
}
- return tip;
- } else {
- return null;
}
+
+ return slowestTIP;
}
/**