You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2008/03/01 10:06:55 UTC
svn commit: r632573 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/java/org/apache/hadoop/mapred/TaskInProgress.java
Author: ddas
Date: Sat Mar 1 01:06:54 2008
New Revision: 632573
URL: http://svn.apache.org/viewvc?rev=632573&view=rev
Log:
HADOOP-2790. Reverted patch due to conflicts in 0.16
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sat Mar 1 01:06:54 2008
@@ -191,10 +191,6 @@
HADOOP-2918. Improve error logging so that dfs writes failure with
"No lease on file" can be diagnosed. (dhruba)
- HADOOP-2790. Optimizes hasSpeculativeTask to do with getting the value of
- time (System.getCurrentTimeMillis()). This is now done only once in the
- beginning and is used for all TIPs. (Owen O'Malley via ddas).
-
Release 0.16.0 - 2008-02-07
INCOMPATIBLE CHANGES
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sat Mar 1 01:06:54 2008
@@ -108,8 +108,6 @@
private LocalFileSystem localFs;
private String jobId;
- private boolean hasSpeculativeMaps;
- private boolean hasSpeculativeReduces;
// Per-job counters
public static enum Counter {
@@ -181,8 +179,6 @@
this.jobMetrics.setTag("sessionId", conf.getSessionId());
this.jobMetrics.setTag("jobName", conf.getJobName());
this.jobMetrics.setTag("jobId", jobid);
- hasSpeculativeMaps = conf.getMapSpeculativeExecution();
- hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
}
/**
@@ -636,7 +632,7 @@
int target = findNewTask(tts, clusterSize, status.mapProgress(),
- maps, nodesToMaps, hasSpeculativeMaps);
+ maps, nodesToMaps);
if (target == -1) {
return null;
}
@@ -672,7 +668,7 @@
}
int target = findNewTask(tts, clusterSize, status.reduceProgress() ,
- reduces, null, hasSpeculativeReduces);
+ reduces, null);
if (target == -1) {
return null;
}
@@ -758,11 +754,10 @@
return trackerErrors;
}
- private boolean shouldRunSpeculativeTask(long currentTime,
- TaskInProgress task,
- double avgProgress,
- String taskTracker) {
- return task.hasSpeculativeTask(currentTime, avgProgress) &&
+ private boolean shouldRunSpeculativeTask(TaskInProgress task,
+ double avgProgress,
+ String taskTracker) {
+ return task.hasSpeculativeTask(avgProgress) &&
!task.hasRunOnMachine(taskTracker);
}
@@ -774,15 +769,13 @@
* @param tasks The list of potential tasks to try
* @param firstTaskToTry The first index in tasks to check
* @param cachedTasks A list of tasks that would like to run on this node
- * @param hasSpeculative Should it try to find speculative tasks
* @return the index in tasks of the selected task (or -1 for no task)
*/
private int findNewTask(TaskTrackerStatus tts,
int clusterSize,
double avgProgress,
TaskInProgress[] tasks,
- Map<Node,List<TaskInProgress>> cachedTasks,
- boolean hasSpeculative) {
+ Map<Node,List<TaskInProgress>> cachedTasks) {
String taskTracker = tts.getTrackerName();
int specTarget = -1;
@@ -806,7 +799,6 @@
}
return -1;
}
- long currentTime = System.currentTimeMillis();
//
// See if there is a split over a block that is stored on
@@ -853,9 +845,8 @@
}
return cacheTarget;
}
- if (hasSpeculative && specTarget == -1 &&
- shouldRunSpeculativeTask(currentTime, tip, avgProgress,
- taskTracker)) {
+ if (specTarget == -1 &&
+ shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
specTarget = tip.getIdWithinJob();
}
}
@@ -890,9 +881,8 @@
if (!isRunning) {
LOG.info("Choosing normal task " + tasks[i].getTIPId());
return i;
- } else if (hasSpeculative && specTarget == -1 &&
- shouldRunSpeculativeTask(currentTime, task, avgProgress,
- taskTracker)) {
+ } else if (specTarget == -1 &&
+ shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
specTarget = i;
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632573&r1=632572&r2=632573&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sat Mar 1 01:06:54 2008
@@ -96,6 +96,7 @@
// currently runnings
private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
private JobConf conf;
+ private boolean runSpeculative;
private Map<String,List<String>> taskDiagnosticData =
new TreeMap<String,List<String>>();
/**
@@ -130,6 +131,7 @@
this.conf = conf;
this.partition = partition;
setMaxTaskAttempts();
+ this.runSpeculative = conf.getMapSpeculativeExecution();
init(JobTracker.getJobUniqueString(jobid));
}
@@ -147,6 +149,7 @@
this.job = job;
this.conf = conf;
setMaxTaskAttempts();
+ this.runSpeculative = conf.getReduceSpeculativeExecution();
init(JobTracker.getJobUniqueString(jobid));
}
@@ -201,6 +204,11 @@
*/
void init(String jobUniqueString) {
this.startTime = System.currentTimeMillis();
+ if ("true".equals(conf.get("mapred.speculative.execution"))) {
+ this.runSpeculative = true;
+ } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
+ this.runSpeculative = false;
+ }
this.taskIdPrefix = makeUniqueString(jobUniqueString);
this.id = "tip_" + this.taskIdPrefix;
}
@@ -683,15 +691,16 @@
* far behind, and has been behind for a non-trivial amount of
* time.
*/
- boolean hasSpeculativeTask(long currentTime, double averageProgress) {
+ boolean hasSpeculativeTask(double averageProgress) {
//
// REMIND - mjc - these constants should be examined
// in more depth eventually...
//
if (activeTasks.size() <= MAX_TASK_EXECS &&
+ runSpeculative &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
- (currentTime - startTime >= SPECULATIVE_LAG)
+ (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)
&& completes == 0 && !isOnlyCommitPending()) {
return true;
}