You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/03/02 09:38:31 UTC
svn commit: r632722 - in /hadoop/core/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/java/org/apache/hadoop/mapred/TaskInProgress.java
Author: omalley
Date: Sun Mar 2 00:38:28 2008
New Revision: 632722
URL: http://svn.apache.org/viewvc?rev=632722&view=rev
Log:
HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
repetitive calls to get the current time and late checking to see if
we want speculation on at all.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Sun Mar 2 00:38:28 2008
@@ -52,6 +52,10 @@
OPTIMIZATIONS
+ HADOOP-2790. Fixed inefficient method hasSpeculativeTask by removing
+ repetitive calls to get the current time and late checking to see if
+ we want speculation on at all. (omalley)
+
BUG FIXES
HADOOP-2195. '-mkdir' behaviour is now closer to Linux shell in case of
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Sun Mar 2 00:38:28 2008
@@ -108,6 +108,8 @@
private LocalFileSystem localFs;
private String jobId;
+ private boolean hasSpeculativeMaps;
+ private boolean hasSpeculativeReduces;
// Per-job counters
public static enum Counter {
@@ -179,6 +181,8 @@
this.jobMetrics.setTag("sessionId", conf.getSessionId());
this.jobMetrics.setTag("jobName", conf.getJobName());
this.jobMetrics.setTag("jobId", jobid);
+ hasSpeculativeMaps = conf.getMapSpeculativeExecution();
+ hasSpeculativeReduces = conf.getReduceSpeculativeExecution();
}
/**
@@ -632,7 +636,7 @@
int target = findNewTask(tts, clusterSize, status.mapProgress(),
- maps, nodesToMaps);
+ maps, nodesToMaps, hasSpeculativeMaps);
if (target == -1) {
return null;
}
@@ -668,7 +672,7 @@
}
int target = findNewTask(tts, clusterSize, status.reduceProgress() ,
- reduces, null);
+ reduces, null, hasSpeculativeReduces);
if (target == -1) {
return null;
}
@@ -754,10 +758,11 @@
return trackerErrors;
}
- private boolean shouldRunSpeculativeTask(TaskInProgress task,
- double avgProgress,
- String taskTracker) {
- return task.hasSpeculativeTask(avgProgress) &&
+ private boolean shouldRunSpeculativeTask(long currentTime,
+ TaskInProgress task,
+ double avgProgress,
+ String taskTracker) {
+ return task.hasSpeculativeTask(currentTime, avgProgress) &&
!task.hasRunOnMachine(taskTracker);
}
@@ -769,13 +774,15 @@
* @param tasks The list of potential tasks to try
* @param firstTaskToTry The first index in tasks to check
* @param cachedTasks A list of tasks that would like to run on this node
+ * @param hasSpeculative Should it try to find speculative tasks
* @return the index in tasks of the selected task (or -1 for no task)
*/
private int findNewTask(TaskTrackerStatus tts,
int clusterSize,
double avgProgress,
TaskInProgress[] tasks,
- Map<Node,List<TaskInProgress>> cachedTasks) {
+ Map<Node,List<TaskInProgress>> cachedTasks,
+ boolean hasSpeculative) {
String taskTracker = tts.getTrackerName();
int specTarget = -1;
@@ -799,6 +806,7 @@
}
return -1;
}
+ long currentTime = System.currentTimeMillis();
//
// See if there is a split over a block that is stored on
@@ -845,8 +853,9 @@
}
return cacheTarget;
}
- if (specTarget == -1 &&
- shouldRunSpeculativeTask(tip, avgProgress, taskTracker)) {
+ if (hasSpeculative && specTarget == -1 &&
+ shouldRunSpeculativeTask(currentTime, tip, avgProgress,
+ taskTracker)) {
specTarget = tip.getIdWithinJob();
}
}
@@ -881,8 +890,9 @@
if (!isRunning) {
LOG.info("Choosing normal task " + tasks[i].getTIPId());
return i;
- } else if (specTarget == -1 &&
- shouldRunSpeculativeTask(task, avgProgress, taskTracker)) {
+ } else if (hasSpeculative && specTarget == -1 &&
+ shouldRunSpeculativeTask(currentTime, task, avgProgress,
+ taskTracker)) {
specTarget = i;
}
}
Modified: hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=632722&r1=632721&r2=632722&view=diff
==============================================================================
--- hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Sun Mar 2 00:38:28 2008
@@ -96,7 +96,6 @@
// currently runnings
private TreeMap<String, String> activeTasks = new TreeMap<String, String>();
private JobConf conf;
- private boolean runSpeculative;
private Map<String,List<String>> taskDiagnosticData =
new TreeMap<String,List<String>>();
/**
@@ -131,7 +130,6 @@
this.conf = conf;
this.partition = partition;
setMaxTaskAttempts();
- this.runSpeculative = conf.getMapSpeculativeExecution();
init(JobTracker.getJobUniqueString(jobid));
}
@@ -149,7 +147,6 @@
this.job = job;
this.conf = conf;
setMaxTaskAttempts();
- this.runSpeculative = conf.getReduceSpeculativeExecution();
init(JobTracker.getJobUniqueString(jobid));
}
@@ -204,11 +201,6 @@
*/
void init(String jobUniqueString) {
this.startTime = System.currentTimeMillis();
- if ("true".equals(conf.get("mapred.speculative.execution"))) {
- this.runSpeculative = true;
- } else if ("false".equals(conf.get("mapred.speculative.execution"))) {
- this.runSpeculative = false;
- }
this.taskIdPrefix = makeUniqueString(jobUniqueString);
this.id = "tip_" + this.taskIdPrefix;
}
@@ -691,16 +683,15 @@
* far behind, and has been behind for a non-trivial amount of
* time.
*/
- boolean hasSpeculativeTask(double averageProgress) {
+ boolean hasSpeculativeTask(long currentTime, double averageProgress) {
//
// REMIND - mjc - these constants should be examined
// in more depth eventually...
//
if (activeTasks.size() <= MAX_TASK_EXECS &&
- runSpeculative &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
- (System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)
+ (currentTime - startTime >= SPECULATIVE_LAG)
&& completes == 0 && !isOnlyCommitPending()) {
return true;
}