You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2006/12/15 23:35:52 UTC
svn commit: r487697 [2/2] - in /lucene/hadoop/branches/branch-0.9: ./ conf/
site/ src/java/org/apache/hadoop/mapred/
src/site/src/documentation/content/xdocs/
Modified: lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=487697&r1=487696&r2=487697
==============================================================================
--- lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.9/src/java/org/apache/hadoop/mapred/TaskTracker.java Fri Dec 15 14:35:51 2006
@@ -68,6 +68,9 @@
Server taskReportServer = null;
InterTrackerProtocol jobClient;
+
+ // last heartbeat response recieved
+ short heartbeatResponseId = -1;
StatusHttpServer server = null;
@@ -187,7 +190,7 @@
}
}
}
-
+
static String getCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.CACHEDIR;
}
@@ -451,15 +454,23 @@
}
}
- if (!transmitHeartBeat()) {
+ // Send the heartbeat and process the jobtracker's directives
+ HeartbeatResponse heartbeatResponse = transmitHeartBeat();
+ TaskTrackerAction[] actions = heartbeatResponse.getActions();
+ LOG.debug("Got heartbeatResponse from JobTracker with responseId: " +
+ heartbeatResponse.getResponseId() + " and " +
+ ((actions != null) ? actions.length : 0) + " actions");
+
+ if (reinitTaskTracker(actions)) {
return State.STALE;
}
+
lastHeartbeat = now;
justStarted = false;
- checkForNewTasks();
+ checkAndStartNewTasks(actions);
markUnresponsiveTasks();
- closeCompletedTasks();
+ closeCompletedTasks(actions);
killOverflowingTasks();
//we've cleaned up, resume normal operation
@@ -491,56 +502,94 @@
* @return false if the tracker was unknown
* @throws IOException
*/
- private boolean transmitHeartBeat() throws IOException {
+ private HeartbeatResponse transmitHeartBeat() throws IOException {
//
// Build the heartbeat information for the JobTracker
//
- List<TaskStatus> taskReports = new ArrayList(runningTasks.size());
+ List<TaskStatus> taskReports =
+ new ArrayList<TaskStatus>(runningTasks.size());
synchronized (this) {
- for (TaskInProgress tip: runningTasks.values()) {
- taskReports.add(tip.createStatus());
- }
+ for (TaskInProgress tip: runningTasks.values()) {
+ taskReports.add(tip.createStatus());
+ }
}
TaskTrackerStatus status =
new TaskTrackerStatus(taskTrackerName, localHostname,
- httpPort, taskReports,
- failures);
-
+ httpPort, taskReports,
+ failures);
+
+ //
+ // Check if we should ask for a new Task
+ //
+ boolean askForNewTask = false;
+ if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
+ acceptNewTasks) {
+ checkLocalDirs(fConf.getLocalDirs());
+
+ if (enoughFreeSpace(minSpaceStart)) {
+ askForNewTask = true;
+ }
+ }
+
//
// Xmit the heartbeat
//
+ HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
+ justStarted, askForNewTask,
+ heartbeatResponseId);
+ heartbeatResponseId = heartbeatResponse.getResponseId();
- int resultCode = jobClient.emitHeartbeat(status, justStarted);
synchronized (this) {
- for (TaskStatus taskStatus: taskReports) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
- if (taskStatus.getIsMap()) {
- mapTotal--;
- } else {
- reduceTotal--;
- }
- myMetrics.completeTask();
- runningTasks.remove(taskStatus.getTaskId());
+ for (TaskStatus taskStatus : taskReports) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getIsMap()) {
+ mapTotal--;
+ } else {
+ reduceTotal--;
}
+ myMetrics.completeTask();
+ runningTasks.remove(taskStatus.getTaskId());
+ }
}
}
- return resultCode != InterTrackerProtocol.UNKNOWN_TASKTRACKER;
+ return heartbeatResponse;
}
/**
+ * Check if the jobtracker directed a 'reset' of the tasktracker.
+ *
+ * @param actions the directives of the jobtracker for the tasktracker.
+ * @return <code>true</code> if tasktracker is to be reset,
+ * <code>false</code> otherwise.
+ */
+ private boolean reinitTaskTracker(TaskTrackerAction[] actions) {
+ if (actions != null) {
+ for (TaskTrackerAction action : actions) {
+ if (action.getActionId() ==
+ TaskTrackerAction.ActionType.REINIT_TRACKER) {
+ LOG.info("Recieved RenitTrackerAction from JobTracker");
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ /**
* Check to see if there are any new tasks that we should run.
* @throws IOException
*/
- private void checkForNewTasks() throws IOException {
- //
- // Check if we should ask for a new Task
- //
- if ((mapTotal < maxCurrentTasks || reduceTotal < maxCurrentTasks) &&
- acceptNewTasks) {
- checkLocalDirs(fConf.getLocalDirs());
-
- if (enoughFreeSpace(minSpaceStart)) {
- Task t = jobClient.pollForNewTask(taskTrackerName);
+ private void checkAndStartNewTasks(TaskTrackerAction[] actions)
+ throws IOException {
+ if (actions == null) {
+ return;
+ }
+
+ for (TaskTrackerAction action : actions) {
+ if (action.getActionId() ==
+ TaskTrackerAction.ActionType.LAUNCH_TASK) {
+ Task t = ((LaunchTaskAction)(action)).getTask();
+ LOG.info("LaunchTaskAction: " + t.getTaskId());
if (t != null) {
startNewTask(t);
}
@@ -573,24 +622,73 @@
* Ask the JobTracker if there are any tasks that we should clean up,
* either because we don't need them any more or because the job is done.
*/
- private void closeCompletedTasks() throws IOException {
- String[] toCloseIds = jobClient.pollForTaskWithClosedJob(taskTrackerName);
- if (toCloseIds != null) {
- synchronized (this) {
- for (int i = 0; i < toCloseIds.length; i++) {
- TaskInProgress tip = tasks.get(toCloseIds[i]);
- if (tip != null) {
- // remove the task from running jobs, removing the job if
- // it is the last task
- removeTaskFromJob(tip.getTask().getJobId(), tip);
- tasksToCleanup.put(tip);
+ private void closeCompletedTasks(TaskTrackerAction[] actions)
+ throws IOException {
+ if (actions == null) {
+ return;
+ }
+
+ for (TaskTrackerAction action : actions) {
+ TaskTrackerAction.ActionType actionType = action.getActionId();
+
+ if (actionType == TaskTrackerAction.ActionType.KILL_JOB) {
+ String jobId = ((KillJobAction)action).getJobId();
+ LOG.info("Received 'KillJobAction' for job: " + jobId);
+ synchronized (runningJobs) {
+ RunningJob rjob = runningJobs.get(jobId);
+ if (rjob == null) {
+ LOG.warn("Unknown job " + jobId + " being deleted.");
} else {
- LOG.info("Attempt to cleanup unknown tip " + toCloseIds[i]);
+ synchronized (rjob) {
+ int noJobTasks = rjob.tasks.size();
+ int taskCtr = 0;
+
+ // Add this tips of this job to queue of tasks to be purged
+ for (TaskInProgress tip : rjob.tasks) {
+ // Purge the job files for the last element in rjob.tasks
+ if (++taskCtr == noJobTasks) {
+ tip.setPurgeJobFiles(true);
+ }
+
+ tasksToCleanup.put(tip);
+ }
+
+ // Remove this job
+ rjob.tasks.clear();
+ runningJobs.remove(jobId);
+ }
}
}
+ } else if(actionType == TaskTrackerAction.ActionType.KILL_TASK) {
+ String taskId = ((KillTaskAction)action).getTaskId();
+ LOG.info("Received KillTaskAction for task: " + taskId);
+ purgeTask(tasks.get(taskId), false);
}
}
}
+
+ /**
+ * Remove the tip and update all relevant state.
+ *
+ * @param tip {@link TaskInProgress} to be removed.
+ * @param purgeJobFiles <code>true</code> if the job files are to be
+ * purged, <code>false</code> otherwise.
+ */
+ private void purgeTask(TaskInProgress tip, boolean purgeJobFiles) {
+ if (tip != null) {
+ LOG.info("About to purge task: " + tip.getTask().getTaskId());
+
+ // Cleanup the job files?
+ tip.setPurgeJobFiles(purgeJobFiles);
+
+ // Remove the task from running jobs,
+ // removing the job if it's the last task
+ removeTaskFromJob(tip.getTask().getJobId(), tip);
+
+ // Add this tip to queue of tasks to be purged
+ tasksToCleanup.put(tip);
+ }
+ }
/** Check if we're dangerously low on disk space
* If so, kill jobs to free up space and make sure
@@ -822,6 +920,9 @@
private boolean alwaysKeepTaskFiles;
private TaskStatus taskStatus ;
private boolean keepJobFiles;
+
+ /** Cleanup the job files when the job is complete (done/failed) */
+ private boolean purgeJobFiles = false;
/**
*/
@@ -886,6 +987,10 @@
keepFailedTaskFiles = localJobConf.getKeepFailedTaskFiles();
}
+ public void setPurgeJobFiles(boolean purgeJobFiles) {
+ this.purgeJobFiles = purgeJobFiles;
+ }
+
/**
*/
public synchronized TaskStatus createStatus() {
@@ -1017,32 +1122,39 @@
* We no longer need anything from this task, as the job has
* finished. If the task is still running, kill it (and clean up
*/
- public synchronized void jobHasFinished() throws IOException {
-
- if (getRunState() == TaskStatus.State.RUNNING) {
+ public void jobHasFinished() throws IOException {
+ boolean killTask = false;
+ synchronized(this){
+ killTask = (getRunState() == TaskStatus.State.RUNNING);
+ if (killTask) {
killAndCleanup(false);
- } else {
- cleanup();
- }
- if (keepJobFiles)
- return;
-
- // Delete temp directory in case any task used PhasedFileSystem.
- try{
- String systemDir = task.getConf().get("mapred.system.dir");
- Path taskTempDir = new Path(systemDir + "/" +
- task.getJobId() + "/" + task.getTipId());
- if( fs.exists(taskTempDir)){
- fs.delete(taskTempDir) ;
}
- }catch(IOException e){
- LOG.warn("Error in deleting reduce temporary output",e);
+ }
+ if (!killTask) {
+ cleanup();
+ }
+ if (keepJobFiles)
+ return;
+
+ synchronized(this){
+ // Delete temp directory in case any task used PhasedFileSystem.
+ try{
+ String systemDir = task.getConf().get("mapred.system.dir");
+ Path taskTempDir = new Path(systemDir + "/" +
+ task.getJobId() + "/" + task.getTipId() + "/" + task.getTaskId());
+ if( fs.exists(taskTempDir)){
+ fs.delete(taskTempDir) ;
+ }
+ }catch(IOException e){
+ LOG.warn("Error in deleting reduce temporary output",e);
+ }
+ }
+ // Delete the job directory for this
+ // task if the job is done/failed
+ if (purgeJobFiles) {
+ this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
+ JOBCACHE + Path.SEPARATOR + task.getJobId());
}
-
- // delete the job diretory for this task
- // since the job is done/failed
- this.defaultJobConf.deleteLocalFiles(SUBDIR + Path.SEPARATOR +
- JOBCACHE + Path.SEPARATOR + task.getJobId());
}
/**
@@ -1090,6 +1202,9 @@
* We no longer need anything from this task. Either the
* controlling job is all done and the files have been copied
* away, or the task failed and we don't need the remains.
+ * Any calls to cleanup should not lock the tip first.
+ * cleanup does the right thing- updates tasks in Tasktracker
+ * by locking tasktracker first and then locks the tip.
*/
void cleanup() throws IOException {
String taskId = task.getTaskId();
Modified: lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml?view=diff&rev=487697&r1=487696&r2=487697
==============================================================================
--- lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml (original)
+++ lucene/hadoop/branches/branch-0.9/src/site/src/documentation/content/xdocs/index.xml Fri Dec 15 14:35:51 2006
@@ -15,6 +15,15 @@
<title>News</title>
<section>
+ <title>15 December, 2006: release 0.9.2 available</title>
+ <p>This fixes critical bugs in 0.9.1. For details see the <a
+ href="http://tinyurl.com/ya8lfd">release notes</a>. The release can
+ be obtained from <a
+ href="http://www.apache.org/dyn/closer.cgi/lucene/hadoop/"> a
+ nearby mirror</a>.
+ </p> </section>
+
+ <section>
<title>6 December, 2006: release 0.9.1 available</title>
<p>This fixes critical bugs in 0.9.0. For details see the <a
href="http://tinyurl.com/y55d7p">release notes</a>. The release can