You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by dd...@apache.org on 2009/07/21 08:13:50 UTC
svn commit: r796154 - in /hadoop/mapreduce/trunk: ./
src/java/org/apache/hadoop/mapred/ src/test/mapred/org/apache/hadoop/mapred/
Author: ddas
Date: Tue Jul 21 06:13:50 2009
New Revision: 796154
URL: http://svn.apache.org/viewvc?rev=796154&view=rev
Log:
MAPREDUCE-717. Fixes some corner case issues in speculative execution heuristics. Contributed by Devaraj Das.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Tue Jul 21 06:13:50 2009
@@ -239,3 +239,6 @@
MAPREDUCE-430. Fix bug related to Task getting stuck due to
OutOfMemoryErrors. (Amar Kamat via sharad)
+ MAPREDUCE-717. Fixes some corner case issues in speculative
+ execution heuristics. (Devaraj Das)
+
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Tue Jul 21 06:13:50 2009
@@ -363,8 +363,10 @@
this.jobMetrics.setTag("sessionId", conf.getSessionId());
this.jobMetrics.setTag("jobName", conf.getJobName());
this.jobMetrics.setTag("jobId", jobid.toString());
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+ if (!hasRestarted()) { //This is temporary until we fix the restart model
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
+ }
this.maxLevel = jobtracker.getNumTaskCacheLevels();
this.anyCacheLevel = this.maxLevel+1;
this.nonLocalMaps = new LinkedList<TaskInProgress>();
@@ -1913,16 +1915,17 @@
* @param list pool of tasks to choose from
* @param taskTrackerName the name of the TaskTracker asking for a task
* @param taskTrackerHost the hostname of the TaskTracker asking for a task
+ * @param taskType the type of task (MAP/REDUCE) that we are considering
* @return the TIP to speculatively re-execute
*/
protected synchronized TaskInProgress findSpeculativeTask(
Collection<TaskInProgress> list, String taskTrackerName,
- String taskTrackerHost) {
+ String taskTrackerHost, TaskType taskType) {
if (list.isEmpty()) {
return null;
}
long now = JobTracker.getClock().getTime();
- if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list)) {
+ if (isSlowTracker(taskTrackerName) || atSpeculativeCap(list, taskType)) {
return null;
}
// List of speculatable candidates, start with all, and chop it down
@@ -2141,7 +2144,7 @@
///////// Select a TIP to run on
TaskInProgress tip = findSpeculativeTask(allTips, taskTrackerName,
- taskTrackerHost);
+ taskTrackerHost, TaskType.MAP);
if (tip != null) {
LOG.info("Choosing map task " + tip.getTIPId() +
@@ -2209,7 +2212,7 @@
private synchronized TaskInProgress getSpeculativeReduce(
String taskTrackerName, String taskTrackerHost) {
TaskInProgress tip = findSpeculativeTask(
- runningReduces, taskTrackerName, taskTrackerHost);
+ runningReduces, taskTrackerName, taskTrackerHost, TaskType.REDUCE);
if (tip != null) {
LOG.info("Choosing reduce task " + tip.getTIPId() +
" for speculative execution");
@@ -2223,31 +2226,32 @@
* Check to see if the maximum number of speculative tasks are
* already being executed currently.
* @param tasks the set of tasks to test
+ * @param type the type of task (MAP/REDUCE) that we are considering
* @return has the cap been reached?
*/
- private boolean atSpeculativeCap(Collection<TaskInProgress> tasks) {
+ private boolean atSpeculativeCap(Collection<TaskInProgress> tasks,
+ TaskType type) {
float numTasks = tasks.size();
if (numTasks == 0){
return true; // avoid divide by zero
}
-
+ int speculativeTaskCount = type == TaskType.MAP ? speculativeMapTasks
+ : speculativeReduceTasks;
//return true if totalSpecTask < max(10, 0.01 * total-slots,
// 0.1 * total-running-tasks)
- if (speculativeMapTasks + speculativeReduceTasks < MIN_SPEC_CAP) {
+ if (speculativeTaskCount < MIN_SPEC_CAP) {
return false; // at least one slow tracker's worth of slots(default=10)
}
ClusterStatus c = jobtracker.getClusterStatus(false);
- int numSlots = c.getMaxMapTasks() + c.getMaxReduceTasks();
- if ((float)(speculativeMapTasks + speculativeReduceTasks) <
- numSlots * MIN_SLOTS_CAP) {
+ int numSlots = (type == TaskType.MAP ? c.getMaxMapTasks() : c.getMaxReduceTasks());
+ if ((float)speculativeTaskCount < numSlots * MIN_SLOTS_CAP) {
return false;
}
- boolean atCap = (((float)(speculativeMapTasks+
- speculativeReduceTasks)/numTasks) >= speculativeCap);
+ boolean atCap = (((float)(speculativeTaskCount)/numTasks) >= speculativeCap);
if (LOG.isDebugEnabled()) {
LOG.debug("SpeculativeCap is "+speculativeCap+", specTasks/numTasks is " +
- ((float)(speculativeMapTasks+speculativeReduceTasks)/numTasks)+
+ ((float)(speculativeTaskCount)/numTasks)+
", so atSpecCap() is returning "+atCap);
}
return atCap;
@@ -2355,8 +2359,8 @@
}
private void sub(double oldNum) {
this.count--;
- this.sum -= oldNum;
- this.sumSquares -= oldNum * oldNum;
+ this.sum = Math.max(this.sum -= oldNum, 0.0d);
+ this.sumSquares = Math.max(this.sumSquares -= oldNum * oldNum, 0.0d);
}
public double mean() {
@@ -2365,7 +2369,7 @@
public double var() {
// E(X^2) - E(X)^2
- return (sumSquares/count) - mean() * mean();
+ return Math.max((sumSquares/count) - mean() * mean(), 0.0d);
}
public double std() {
@@ -2511,7 +2515,7 @@
runningMapTasks -= 1;
finishedMapTasks += 1;
metrics.completeMap(taskid);
- if (hasSpeculativeMaps) {
+ if (!tip.isJobSetupTask() && hasSpeculativeMaps) {
updateTaskTrackerStats(tip,ttStatus,trackerMapStats,mapTaskStats);
}
// remove the completed map from the resp running caches
@@ -2523,7 +2527,7 @@
runningReduceTasks -= 1;
finishedReduceTasks += 1;
metrics.completeReduce(taskid);
- if (hasSpeculativeReduces) {
+ if (!tip.isJobSetupTask() && hasSpeculativeReduces) {
updateTaskTrackerStats(tip,ttStatus,trackerReduceStats,reduceTaskStats);
}
// remove the completed reduces from the running reducers set
@@ -2542,7 +2546,7 @@
private void updateTaskTrackerStats(TaskInProgress tip, TaskTrackerStatus ttStatus,
Map<String,DataStatistics> trackerStats, DataStatistics overallStats) {
- float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime();
+ float tipDuration = tip.getExecFinishTime()-tip.getDispatchTime(tip.getSuccessfulTaskid());
DataStatistics ttStats =
trackerStats.get(ttStatus.getTrackerName());
double oldMean = 0.0d;
@@ -2773,12 +2777,14 @@
if (wasSpeculating) {
if (tip.isMapTask()) {
speculativeMapTasks--;
- LOG.debug("Decrement count. Current speculativeMap task count: " +
- speculativeMapTasks);
+ LOG.debug("Decremented count for " +
+ tip.getTIPId()+"/"+tip.getJob().getJobID() +
+ ". Current speculativeMap task count: " + speculativeMapTasks);
} else {
speculativeReduceTasks--;
- LOG.debug("Decremented count. Current speculativeReduce task count: " +
- speculativeReduceTasks);
+ LOG.debug("Decremented count for " +
+ tip.getTIPId()+"/"+tip.getJob().getJobID() +
+ ". Current speculativeReduce task count: " + speculativeReduceTasks);
}
}
}
Modified: hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/mapreduce/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Tue Jul 21 06:13:50 2009
@@ -22,6 +22,7 @@
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Comparator;
+import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -77,7 +78,7 @@
private double progress = 0;
private double oldProgressRate;
private String state = "";
- private long dispatchTime = 0; // most recent time task attempt given to TT
+ private long lastDispatchTime = 0; // most recent time task attempt given to TT
private long execStartTime = 0; // when we started first task-attempt
private long execFinishTime = 0;
private int completes = 0;
@@ -128,6 +129,9 @@
private Counters counters = new Counters();
+ private HashMap<TaskAttemptID, Long> dispatchTimeMap =
+ new HashMap<TaskAttemptID, Long>();
+
/**
* Constructor for MapTask
@@ -237,15 +241,24 @@
/**
* Return the dispatch time
*/
- public long getDispatchTime(){
- return this.dispatchTime;
+ public long getDispatchTime(TaskAttemptID taskid){
+ Long l = dispatchTimeMap.get(taskid);
+ if (l != null) {
+ return l.longValue();
+ }
+ return 0;
+ }
+
+ public long getLastDispatchTime(){
+ return this.lastDispatchTime;
}
/**
* Set the dispatch time
*/
- public void setDispatchTime(long disTime){
- this.dispatchTime = disTime;
+ public void setDispatchTime(TaskAttemptID taskid, long disTime){
+ dispatchTimeMap.put(taskid, disTime);
+ this.lastDispatchTime = disTime;
}
/**
@@ -581,8 +594,9 @@
// but finishTime has to be updated.
if (!isCleanupAttempt(taskid)) {
taskStatuses.put(taskid, status);
- if ((isMapTask() && job.hasSpeculativeMaps()) ||
- (!isMapTask() && job.hasSpeculativeReduces())) {
+ //we don't want to include setup tasks in the task execution stats
+ if (!isJobSetupTask() && ((isMapTask() && job.hasSpeculativeMaps()) ||
+ (!isMapTask() && job.hasSpeculativeReduces()))) {
long now = JobTracker.getClock().getTime();
double oldProgRate = getOldProgressRate();
double currProgRate = getCurrentProgressRate(now);
@@ -906,7 +920,7 @@
}
return (!skipping && isRunnable() && isRunning() &&
activeTasks.size() <= MAX_TASK_EXECS &&
- currentTime - dispatchTime >= SPECULATIVE_LAG &&
+ currentTime - lastDispatchTime >= SPECULATIVE_LAG &&
completes == 0 && !isOnlyCommitPending() &&
(taskStats.mean() - getCurrentProgressRate(currentTime) >
taskStats.std() * job.getSlowTaskThreshold()));
@@ -939,13 +953,13 @@
//keep track of the last time we started an attempt at this TIP
//used to calculate the progress rate of this TIP
- setDispatchTime(JobTracker.getClock().getTime());
+ setDispatchTime(taskid, JobTracker.getClock().getTime());
//set this the first time we run a taskAttempt in this TIP
//each Task attempt has its own TaskStatus, which tracks that
//attempts execStartTime, thus this startTime is TIP wide.
if (0 == execStartTime){
- setExecStartTime(dispatchTime);
+ setExecStartTime(lastDispatchTime);
}
return addRunningTask(taskid, taskTracker);
}
@@ -1139,12 +1153,14 @@
public double getCurrentProgressRate(long currentTime) {
double bestProgressRate = 0;
for (TaskStatus ts : taskStatuses.values()){
- double progressRate = ts.getProgress()/Math.max(1,
- currentTime - dispatchTime);
- if ((ts.getRunState() == TaskStatus.State.RUNNING ||
- ts.getRunState() == TaskStatus.State.SUCCEEDED) &&
- progressRate > bestProgressRate){
- bestProgressRate = progressRate;
+ if (ts.getRunState() == TaskStatus.State.RUNNING ||
+ ts.getRunState() == TaskStatus.State.SUCCEEDED ||
+ ts.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ double progressRate = ts.getProgress()/Math.max(1,
+ currentTime - getDispatchTime(ts.getTaskID()));
+ if (progressRate > bestProgressRate){
+ bestProgressRate = progressRate;
+ }
}
}
return bestProgressRate;
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/FakeObjectUtilities.java Tue Jul 21 06:13:50 2009
@@ -25,6 +25,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.TaskStatus.Phase;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* Utilities used in unit test.
@@ -94,7 +95,7 @@
private TaskAttemptID findTask(String trackerName, String trackerHost,
Collection<TaskInProgress> nonRunningTasks,
- Collection<TaskInProgress> runningTasks)
+ Collection<TaskInProgress> runningTasks, TaskType taskType)
throws IOException {
TaskInProgress tip = null;
Iterator<TaskInProgress> iter = nonRunningTasks.iterator();
@@ -110,7 +111,8 @@
}
if (tip == null) {
if (getJobConf().getSpeculativeExecution()) {
- tip = findSpeculativeTask(runningTasks, trackerName, trackerHost);
+ tip = findSpeculativeTask(runningTasks, trackerName, trackerHost,
+ taskType);
}
}
if (tip != null) {
@@ -131,14 +133,14 @@
throws IOException {
return findTask(trackerName,
JobInProgress.convertTrackerNameToHostName(trackerName),
- nonLocalMaps, nonLocalRunningMaps);
+ nonLocalMaps, nonLocalRunningMaps, TaskType.MAP);
}
public TaskAttemptID findReduceTask(String trackerName)
throws IOException {
return findTask(trackerName,
JobInProgress.convertTrackerNameToHostName(trackerName),
- nonRunningReduces, runningReduces);
+ nonRunningReduces, runningReduces, TaskType.REDUCE);
}
public void finishTask(TaskAttemptID taskId) {
Modified: hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java?rev=796154&r1=796153&r2=796154&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java (original)
+++ hadoop/mapreduce/trunk/src/test/mapred/org/apache/hadoop/mapred/TestSpeculativeExecution.java Tue Jul 21 06:13:50 2009
@@ -34,9 +34,8 @@
static FakeJobTracker jobTracker;
static class SpecFakeClock extends FakeClock {
long SPECULATIVE_LAG = TaskInProgress.SPECULATIVE_LAG;
- @Override
- public void advance(long millis) {
- time += millis + SPECULATIVE_LAG;
+ public void advanceBySpeculativeLag() {
+ time += SPECULATIVE_LAG;
}
};
static SpecFakeClock clock;
@@ -124,6 +123,7 @@
clock.advance(1000);
job.finishTask(taskAttemptID[1]);
clock.advance(20000);
+ clock.advanceBySpeculativeLag();
//we should get a speculative task now
taskAttemptID[5] = job.findReduceTask(trackers[4]);
assertEquals(taskAttemptID[5].getTaskID().getId(),2);
@@ -157,12 +157,12 @@
job.finishTask(taskAttemptID[0]);
job.finishTask(taskAttemptID[1]);
job.finishTask(taskAttemptID[2]);
- clock.advance(28000);
+ clock.advance(250000);
taskAttemptID[4] = job.findMapTask(trackers[3]);
- clock.advance(5000);
+ clock.advanceBySpeculativeLag();
//by doing the above clock adjustments, we bring the progress rate of
- //taskID 3 lower than 4. For taskID 3, the rate is 85/35000
- //and for taskID 4, the rate is 20/5000. But when we ask for a spec task
+ //taskID 3 lower than 4. For taskID 3, the rate is 85/317000
+ //and for taskID 4, the rate is 20/65000. But when we ask for a spec task
//now, we should get back taskID 4 (since that is expected to complete
//later than taskID 3).
job.progressMade(taskAttemptID[3], 0.85f);
@@ -186,7 +186,7 @@
//Tests the fact that the max tasks launched is 10
assertEquals(speculativeCap(1200,1150,20), 10);
//Tests the fact that the max tasks launched is 0.01 * #slots
- assertEquals(speculativeCap(1200,1150,2000), 20);
+ assertEquals(speculativeCap(1200,1150,4000), 20);
}
private int speculativeCap(int totalTasks, int numEarlyComplete, int slots)
@@ -207,6 +207,9 @@
for (i = 0; i < numEarlyComplete; i++) {
job.finishTask(taskAttemptID[i]);
}
+
+ clock.advanceBySpeculativeLag();
+
for (i = numEarlyComplete; i < totalTasks; i++) {
job.progressMade(taskAttemptID[i], 0.85f);
}