You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/08/09 15:48:13 UTC
svn commit: r430052 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobInProgress.java
src/java/org/apache/hadoop/mapred/JobTracker.java
src/java/org/apache/hadoop/mapred/TaskInProgress.java
Author: cutting
Date: Wed Aug 9 06:48:10 2006
New Revision: 430052
URL: http://svn.apache.org/viewvc?rev=430052&view=rev
Log:
HADOOP-400. Improvements to task assignment.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Aug 9 06:48:10 2006
@@ -11,6 +11,11 @@
Solaris. This was causing nightly builds to fail.
(Michel Tourn via cutting)
+ 3. HADOOP-400. Improvements to task assignment. Tasks are no longer
+ re-run on nodes where they have failed (unless no other node is
+ available). Also, tasks are better load-balanced among nodes.
+ (omalley via cutting)
+
Release 0.5.0 - 2006-08-04
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Aug 9 06:48:10 2006
@@ -311,167 +311,132 @@
/**
* Return a MapTask, if appropriate, to run on the given tasktracker
*/
- public Task obtainNewMapTask(String taskTracker, TaskTrackerStatus tts) {
+ public Task obtainNewMapTask(TaskTrackerStatus tts, int clusterSize) {
+ if (! tasksInited) {
+ LOG.info("Cannot create task split for " + profile.getJobId());
+ return null;
+ }
+ ArrayList mapCache = (ArrayList)hostToMaps.get(tts.getHost());
+ double avgProgress = status.mapProgress() / maps.length;
+ int target = findNewTask(tts, clusterSize, avgProgress,
+ maps, firstMapToTry, mapCache);
+ if (target == -1) {
+ return null;
+ }
+ boolean wasRunning = maps[target].isRunning();
+ Task result = maps[target].getTaskToRun(tts.getTrackerName());
+ if (!wasRunning) {
+ runningMapTasks += 1;
+ }
+ return result;
+ }
+
+ /**
+ * Return a ReduceTask, if appropriate, to run on the given tasktracker.
+ * We don't have cache-sensitivity for reduce tasks, as they
+ * work on temporary MapRed files.
+ */
+ public Task obtainNewReduceTask(TaskTrackerStatus tts,
+ int clusterSize) {
if (! tasksInited) {
LOG.info("Cannot create task split for " + profile.getJobId());
return null;
}
- Task t = null;
- int cacheTarget = -1;
- int stdTarget = -1;
- int specTarget = -1;
- int failedTarget = -1;
-
- //
- // We end up creating two tasks for the same bucket, because
- // we call obtainNewMapTask() really fast, twice in a row.
- // There's not enough time for the "recentTasks"
- //
-
- //
- // Compute avg progress through the map tasks
- //
- double avgProgress = status.mapProgress() / maps.length;
-
+ double avgProgress = status.reduceProgress() / reduces.length;
+ int target = findNewTask(tts, clusterSize, avgProgress,
+ reduces, firstReduceToTry, null);
+ if (target == -1) {
+ return null;
+ }
+ boolean wasRunning = reduces[target].isRunning();
+ Task result = reduces[target].getTaskToRun(tts.getTrackerName());
+ if (!wasRunning) {
+ runningReduceTasks += 1;
+ }
+ return result;
+ }
+
+ /**
+ * Find a new task to run.
+ * @param tts The task tracker that is asking for a task
+ * @param clusterSize The number of task trackers in the cluster
+ * @param avgProgress The average progress of this kind of task in this job
+ * @param tasks The list of potential tasks to try
+ * @param firstTaskToTry The first index in tasks to check
+ * @param cachedTasks A list of tasks that would like to run on this node
+ * @return the index in tasks of the selected task (or -1 for no task)
+ */
+ private int findNewTask(TaskTrackerStatus tts,
+ int clusterSize,
+ double avgProgress,
+ TaskInProgress[] tasks,
+ int firstTaskToTry,
+ List cachedTasks) {
+ String taskTracker = tts.getTrackerName();
//
// See if there is a split over a block that is stored on
// the TaskTracker checking in. That means the block
// doesn't have to be transmitted from another node.
//
- ArrayList hostMaps = (ArrayList)hostToMaps.get(tts.getHost());
- if (hostMaps != null) {
- Iterator i = hostMaps.iterator();
+ if (cachedTasks != null) {
+ Iterator i = cachedTasks.iterator();
while (i.hasNext()) {
TaskInProgress tip = (TaskInProgress)i.next();
- if (tip.hasTask() && !tip.hasFailedOnMachine(taskTracker)) {
- LOG.info("Found task with local split for "+tts.getHost());
- cacheTarget = tip.getIdWithinJob();
- i.remove();
- break;
+ i.remove();
+ if (tip.isRunnable() &&
+ !tip.isRunning() &&
+ !tip.hasFailedOnMachine(taskTracker)) {
+ LOG.info("Choosing cached task " + tip.getTIPId());
+ int cacheTarget = tip.getIdWithinJob();
+ return cacheTarget;
}
}
}
+
//
// If there's no cached target, see if there's
// a std. task to run.
//
- if (cacheTarget < 0) {
- for (int i = 0; i < maps.length; i++) {
- int realIdx = (i + firstMapToTry) % maps.length;
- if (maps[realIdx].hasTask()) {
- if (stdTarget < 0) {
- if (maps[realIdx].hasFailedOnMachine(taskTracker)) {
- if (failedTarget < 0) {
- failedTarget = realIdx;
- }
- } else {
- stdTarget = realIdx;
- break;
- }
- }
- }
- }
- }
-
- //
- // If no cached-target and no std target, see if
- // there's a speculative task to run.
- //
- if (cacheTarget < 0 && stdTarget < 0) {
- for (int i = 0; i < maps.length; i++) {
- int realIdx = (i + firstMapToTry) % maps.length;
- if (maps[realIdx].hasSpeculativeTask(avgProgress)) {
- if (!maps[realIdx].hasFailedOnMachine(taskTracker)) {
- specTarget = realIdx;
- break;
- }
- }
- }
- }
-
- //
- // Run whatever we found
- //
- if (cacheTarget >= 0) {
- t = maps[cacheTarget].getTaskToRun(taskTracker, tts, avgProgress);
- runningMapTasks += 1;
- } else if (stdTarget >= 0) {
- t = maps[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
- runningMapTasks += 1;
- } else if (specTarget >= 0) {
- //should always be true, but being paranoid
- boolean isRunning = maps[specTarget].isRunning();
- t = maps[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
- if (!isRunning){
- runningMapTasks += 1;
- }
- } else if (failedTarget >= 0) {
- //should always be false, but being paranoid again
- boolean isRunning = maps[failedTarget].isRunning();
- t = maps[failedTarget].getTaskToRun(taskTracker, tts, avgProgress);
- if (!isRunning) {
- runningMapTasks += 1;
- }
- }
- return t;
- }
-
- /**
- * Return a ReduceTask, if appropriate, to run on the given tasktracker.
- * We don't have cache-sensitivity for reduce tasks, as they
- * work on temporary MapRed files.
- */
- public Task obtainNewReduceTask(String taskTracker, TaskTrackerStatus tts) {
- if (! tasksInited) {
- LOG.info("Cannot create task split for " + profile.getJobId());
- return null;
- }
-
- Task t = null;
- int stdTarget = -1;
- int specTarget = -1;
int failedTarget = -1;
- double avgProgress = status.reduceProgress() / reduces.length;
-
- for (int i = 0; i < reduces.length; i++) {
- int realIdx = (i + firstReduceToTry) % reduces.length;
- if (reduces[realIdx].hasTask()) {
- if (reduces[realIdx].hasFailedOnMachine(taskTracker)) {
- if (failedTarget < 0) {
- failedTarget = realIdx;
- }
- } else if (stdTarget < 0) {
- stdTarget = realIdx;
- }
- } else if (reduces[realIdx].hasSpeculativeTask(avgProgress)) {
- if (specTarget < 0 &&
- !reduces[realIdx].hasFailedOnMachine(taskTracker)) {
- specTarget = realIdx;
- }
+ int specTarget = -1;
+ for (int i = 0; i < tasks.length; i++) {
+ int realIdx = (i + firstTaskToTry) % tasks.length;
+ TaskInProgress task = tasks[realIdx];
+ if (task.isRunnable()) {
+ // if it failed here and we haven't tried every machine, we
+ // don't schedule it here.
+ boolean hasFailed = task.hasFailedOnMachine(taskTracker);
+ if (hasFailed && (task.getNumberOfFailedMachines() < clusterSize)) {
+ continue;
+ }
+ boolean isRunning = task.isRunning();
+ if (hasFailed) {
+ // failed tasks that aren't running can be scheduled as a last
+ // resort
+ if (!isRunning && failedTarget == -1) {
+ failedTarget = realIdx;
+ }
+ } else {
+ if (!isRunning) {
+ LOG.info("Choosing normal task " + tasks[realIdx].getTIPId());
+ return realIdx;
+ } else if (specTarget == -1 &&
+ task.hasSpeculativeTask(avgProgress)) {
+ specTarget = realIdx;
+ }
}
+ }
}
-
- if (stdTarget >= 0) {
- t = reduces[stdTarget].getTaskToRun(taskTracker, tts, avgProgress);
- runningReduceTasks += 1;
- } else if (specTarget >= 0) {
- //should be false
- boolean isRunning = reduces[specTarget].isRunning();
- t = reduces[specTarget].getTaskToRun(taskTracker, tts, avgProgress);
- if (!isRunning){
- runningReduceTasks += 1;
- }
- } else if (failedTarget >= 0) {
- boolean isRunning = reduces[failedTarget].isRunning();
- t = reduces[failedTarget].getTaskToRun(taskTracker, tts,
- avgProgress);
- if (!isRunning){
- runningReduceTasks += 1;
- }
+ if (specTarget != -1) {
+ LOG.info("Choosing speculative task " +
+ tasks[specTarget].getTIPId());
+ } else if (failedTarget != -1) {
+ LOG.info("Choosing failed task " +
+ tasks[failedTarget].getTIPId());
}
- return t;
+ return specTarget != -1 ? specTarget : failedTarget;
}
/**
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Aug 9 06:48:10 2006
@@ -779,13 +779,17 @@
int remainingMapLoad = 0;
int numTaskTrackers;
TaskTrackerStatus tts;
- int avgMapLoad = 0;
- int avgReduceLoad = 0;
synchronized (taskTrackers) {
numTaskTrackers = taskTrackers.size();
tts = (TaskTrackerStatus) taskTrackers.get(taskTracker);
}
+ if (tts == null) {
+ LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
+ return null;
+ }
+ int totalCapacity = numTaskTrackers * maxCurrentTasks;
+
synchronized(jobsByArrival){
for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
JobInProgress job = (JobInProgress) it.next();
@@ -797,19 +801,23 @@
}
}
}
-
+
+ // find out the maximum number of maps or reduces that we are willing
+ // to run on any node.
+ int maxMapLoad = 0;
+ int maxReduceLoad = 0;
if (numTaskTrackers > 0) {
- avgMapLoad = remainingMapLoad / numTaskTrackers;
- avgReduceLoad = remainingReduceLoad / numTaskTrackers;
+ maxMapLoad = Math.min(maxCurrentTasks,
+ (int) Math.ceil((double) remainingMapLoad /
+ numTaskTrackers));
+ maxReduceLoad = Math.min(maxCurrentTasks,
+ (int) Math.ceil((double) remainingReduceLoad
+ / numTaskTrackers));
}
- int totalCapacity = numTaskTrackers * maxCurrentTasks;
+
//
// Get map + reduce counts for the current tracker.
//
- if (tts == null) {
- LOG.warn("Unknown task tracker polling; ignoring: " + taskTracker);
- return null;
- }
int numMaps = tts.countMapTasks();
int numReduces = tts.countReduceTasks();
@@ -823,18 +831,12 @@
//
// We hand a task to the current taskTracker if the given machine
- // has a workload that's equal to or less than the pendingMaps average.
- // This way the maps are launched if the TaskTracker has running tasks
- // less than the pending average
- // +/- TASK_ALLOC_EPSILON. (That epsilon is in place in case
- // there is an odd machine that is failing for some reason but
- // has not yet been removed from the pool, making capacity seem
- // larger than it really is.)
+ // has a workload that's less than the maximum load of that kind of
+ // task.
//
synchronized (jobsByArrival) {
- if ((numMaps < maxCurrentTasks) &&
- (numMaps <= avgMapLoad + 1 + TASK_ALLOC_EPSILON)) {
+ if (numMaps < maxMapLoad) {
int totalNeededMaps = 0;
for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
@@ -843,7 +845,7 @@
continue;
}
- Task t = job.obtainNewMapTask(taskTracker, tts);
+ Task t = job.obtainNewMapTask(tts, numTaskTrackers);
if (t != null) {
expireLaunchingTasks.addNewTask(t.getTaskId());
myMetrics.launchMap();
@@ -870,17 +872,17 @@
//
// Same thing, but for reduce tasks
//
- if ((numReduces < maxCurrentTasks) &&
- (numReduces <= avgReduceLoad + 1 + TASK_ALLOC_EPSILON)) {
+ if (numReduces < maxReduceLoad) {
int totalNeededReduces = 0;
for (Iterator it = jobsByArrival.iterator(); it.hasNext(); ) {
JobInProgress job = (JobInProgress) it.next();
- if (job.getStatus().getRunState() != JobStatus.RUNNING) {
+ if (job.getStatus().getRunState() != JobStatus.RUNNING ||
+ job.numReduceTasks == 0) {
continue;
}
- Task t = job.obtainNewReduceTask(taskTracker, tts);
+ Task t = job.obtainNewReduceTask(tts, numTaskTrackers);
if (t != null) {
expireLaunchingTasks.addNewTask(t.getTaskId());
myMetrics.launchReduce();
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=430052&r1=430051&r2=430052&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Aug 9 06:48:10 2006
@@ -16,8 +16,10 @@
package org.apache.hadoop.mapred;
import org.apache.commons.logging.*;
+import org.apache.hadoop.util.*;
import java.text.NumberFormat;
+import java.io.*;
import java.util.*;
@@ -391,21 +393,12 @@
/////////////////////////////////////////////////
/**
- * Return whether this TIP has a non-speculative task to run
+ * Return whether this TIP still needs to run
*/
- boolean hasTask() {
- if (failed || isComplete() || recentTasks.size() > 0) {
- return false;
- } else {
- for (Iterator it = taskStatuses.values().iterator(); it.hasNext(); ) {
- TaskStatus ts = (TaskStatus) it.next();
- if (ts.getRunState() == TaskStatus.RUNNING) {
- return false;
- }
- }
- return true;
- }
+ boolean isRunnable() {
+ return !failed && (completes == 0);
}
+
/**
* Return whether the TIP has a speculative task to run. We
* only launch a speculative task if the current TIP is really
@@ -430,27 +423,24 @@
/**
* Return a Task that can be sent to a TaskTracker for execution.
*/
- public Task getTaskToRun(String taskTracker, TaskTrackerStatus tts, double avgProgress) {
+ public Task getTaskToRun(String taskTracker) {
Task t = null;
- if (hasTask() ||
- hasSpeculativeTask(avgProgress)) {
-
- String taskid = (String) usableTaskIds.first();
- usableTaskIds.remove(taskid);
- String jobId = job.getProfile().getJobId();
-
- if (isMapTask()) {
- t = new MapTask(jobId, jobFile, taskid, partition, split);
- } else {
- t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
- }
- t.setConf(conf);
- recentTasks.add(taskid);
+ String taskid = (String) usableTaskIds.first();
+ usableTaskIds.remove(taskid);
+ String jobId = job.getProfile().getJobId();
- // Ask JobTracker to note that the task exists
- jobtracker.createTaskEntry(taskid, taskTracker, this);
+ if (isMapTask()) {
+ t = new MapTask(jobId, jobFile, taskid, partition, split);
+ } else {
+ t = new ReduceTask(jobId, jobFile, taskid, partition, numMaps);
}
+ t.setConf(conf);
+
+ recentTasks.add(taskid);
+
+ // Ask JobTracker to note that the task exists
+ jobtracker.createTaskEntry(taskid, taskTracker, this);
return t;
}
@@ -461,6 +451,14 @@
*/
public boolean hasFailedOnMachine(String tracker) {
return machinesWhereFailed.contains(tracker);
+ }
+
+ /**
+ * Get the number of machines where this task has failed.
+ * @return the size of the failed machine set
+ */
+ public int getNumberOfFailedMachines() {
+ return machinesWhereFailed.size();
}
/**