You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2007/10/10 11:32:50 UTC
svn commit: r583408 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/ src/webapps/job/
Author: acmurthy
Date: Wed Oct 10 02:32:49 2007
New Revision: 583408
URL: http://svn.apache.org/viewvc?rev=583408&view=rev
Log:
HADOOP-1874. Move task-outputs' promotion/discard to a separate thread distinct from the main heartbeat-processing thread. The main upside being that we do not lock-up the JobTracker during HDFS operations, which otherwise may lead to lost tasktrackers if the NameNode is unresponsive. Contributed by Devaraj Das.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Wed Oct 10 02:32:49 2007
@@ -253,6 +253,12 @@
HADOOP-1992. Fix the performance degradation in the sort validator.
(acmurthy via omalley)
+ HADOOP-1874. Move task-outputs' promotion/discard to a separate thread
+ distinct from the main heartbeat-processing thread. The main upside being
+ that we do not lock-up the JobTracker during HDFS operations, which
+ otherwise may lead to lost tasktrackers if the NameNode is unresponsive.
+ (Devaraj Das via acmurthy)
+
IMPROVEMENTS
HADOOP-1908. Restructure data node code so that block sending and
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Oct 10 02:32:49 2007
@@ -37,7 +37,6 @@
import org.apache.hadoop.metrics.MetricsContext;
import org.apache.hadoop.metrics.MetricsRecord;
import org.apache.hadoop.metrics.MetricsUtil;
-import org.apache.hadoop.util.StringUtils;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -380,6 +379,16 @@
TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTracker(status.getTaskTracker());
String httpTaskLogLocation = null;
+
+ if (state == TaskStatus.State.COMMIT_PENDING ||
+ state == TaskStatus.State.FAILED ||
+ state == TaskStatus.State.KILLED) {
+ JobWithTaskContext j = new JobWithTaskContext(this, tip,
+ status.getTaskId(),
+ metrics);
+ jobtracker.addToCommitQueue(j);
+ }
+
if (null != ttStatus){
httpTaskLogLocation = "http://" + ttStatus.getHost() + ":" +
ttStatus.getHttpPort() + "/tasklog?plaintext=true&taskid=" +
@@ -388,7 +397,7 @@
TaskCompletionEvent taskEvent = null;
if (state == TaskStatus.State.SUCCEEDED) {
- boolean complete = false;
+ completedTask(tip, status, metrics);
taskEvent = new TaskCompletionEvent(
taskCompletionEventTracker,
status.getTaskId(),
@@ -397,28 +406,14 @@
TaskCompletionEvent.Status.SUCCEEDED,
httpTaskLogLocation
);
- try {
- complete = completedTask(tip, status, metrics);
- } catch (IOException ioe) {
- // Oops! Failed to copy the task's output to its final place;
- // fail the task!
- failedTask(tip, status.getTaskId(),
- "Failed to copy reduce's output",
- (tip.isMapTask() ?
- TaskStatus.Phase.MAP :
- TaskStatus.Phase.REDUCE),
- TaskStatus.State.FAILED,
- status.getTaskTracker(), null);
- LOG.info("Failed to copy the output of " + status.getTaskId() +
- " with: " + StringUtils.stringifyException(ioe));
- return;
- }
-
- if (complete) {
- tip.setSuccessEventNumber(taskCompletionEventTracker);
- } else {
- taskEvent.setTaskStatus(TaskCompletionEvent.Status.KILLED);
- }
+ tip.setSuccessEventNumber(taskCompletionEventTracker);
+ }
+ //For a failed task update the JT datastructures.For the task state where
+ //only the COMMIT is pending, delegate everything to the JT thread. For
+ //failed tasks we want the JT to schedule a reexecution ASAP (and not go
+ //via the queue for the datastructures' updates).
+ else if (state == TaskStatus.State.COMMIT_PENDING) {
+ return;
} else if (state == TaskStatus.State.FAILED ||
state == TaskStatus.State.KILLED) {
// Get the event number for the (possibly) previously successful
@@ -771,7 +766,7 @@
public synchronized boolean completedTask(TaskInProgress tip,
TaskStatus status,
JobTrackerMetrics metrics)
- throws IOException {
+ {
String taskid = status.getTaskId();
// Sanity check: is the TIP already complete?
@@ -928,14 +923,10 @@
TaskStatus status, String trackerName,
boolean wasRunning, boolean wasComplete,
JobTrackerMetrics metrics) {
- if(status.getRunState() == TaskStatus.State.KILLED ) {
- tip.taskKilled(taskid, trackerName, this.status);
- }
- else {
- // Mark the taskid as a 'failure'
- tip.incompleteSubTask(taskid, trackerName, this.status);
- }
-
+
+ // Mark the taskid as FAILED or KILLED
+ tip.incompleteSubTask(taskid, trackerName, this.status);
+
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
@@ -1065,7 +1056,7 @@
reason,
reason,
trackerName, phase,
- tip.getCounters());
+ null);
updateTaskStatus(tip, status, metrics);
JobHistory.Task.logFailed(profile.getJobId(), tip.getTIPId(),
tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
@@ -1176,6 +1167,32 @@
TaskStatus.State.FAILED, trackerName, metrics);
mapTaskIdToFetchFailuresMap.remove(mapTaskId);
+ }
+ }
+
+ static class JobWithTaskContext {
+ private JobInProgress job;
+ private TaskInProgress tip;
+ private String taskId;
+ private JobTrackerMetrics metrics;
+ JobWithTaskContext(JobInProgress job, TaskInProgress tip,
+ String taskId, JobTrackerMetrics metrics) {
+ this.job = job;
+ this.tip = tip;
+ this.taskId = taskId;
+ this.metrics = metrics;
+ }
+ JobInProgress getJob() {
+ return job;
+ }
+ TaskInProgress getTIP() {
+ return tip;
+ }
+ String getTaskId() {
+ return taskId;
+ }
+ JobTrackerMetrics getJobTrackerMetrics() {
+ return metrics;
}
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Oct 10 02:32:49 2007
@@ -39,6 +39,7 @@
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.Vector;
+import java.util.concurrent.LinkedBlockingQueue;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -609,6 +610,8 @@
Path systemDir = null;
private JobConf conf;
+ private Thread taskCommitThread;
+
/**
* Start the JobTracker process, listen on the indicated port
*/
@@ -663,15 +666,6 @@
myMetrics = new JobTrackerMetrics(this, jobConf);
- this.expireTrackersThread = new Thread(this.expireTrackers,
- "expireTrackers");
- this.expireTrackersThread.start();
- this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
- this.retireJobsThread.start();
- this.initJobsThread = new Thread(this.initJobs, "initJobs");
- this.initJobsThread.start();
- expireLaunchingTaskThread.start();
-
// The rpc/web-server ports can be ephemeral ports...
// ... ensure we have the correct info
this.port = interTrackerServer.getListenerAddress().getPort();
@@ -726,6 +720,17 @@
* Run forever
*/
public void offerService() throws InterruptedException {
+ this.expireTrackersThread = new Thread(this.expireTrackers,
+ "expireTrackers");
+ this.expireTrackersThread.start();
+ this.retireJobsThread = new Thread(this.retireJobs, "retireJobs");
+ this.retireJobsThread.start();
+ this.initJobsThread = new Thread(this.initJobs, "initJobs");
+ this.initJobsThread.start();
+ expireLaunchingTaskThread.start();
+ this.taskCommitThread = new TaskCommitQueue();
+ this.taskCommitThread.start();
+
this.interTrackerServer.join();
LOG.info("Stopped interTrackerServer");
}
@@ -781,6 +786,16 @@
ex.printStackTrace();
}
}
+ if (this.taskCommitThread != null) {
+ LOG.info("Stopping TaskCommit thread");
+ this.taskCommitThread.interrupt();
+ try {
+ this.taskCommitThread.interrupt();
+ this.taskCommitThread.join();
+ } catch (InterruptedException ex) {
+ ex.printStackTrace();
+ }
+ }
LOG.info("stopped all jobtracker services");
return;
}
@@ -853,7 +868,8 @@
void markCompletedJob(JobInProgress job) {
for (TaskInProgress tip : job.getMapTasks()) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskId());
}
@@ -861,7 +877,8 @@
}
for (TaskInProgress tip : job.getReduceTasks()) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskId());
}
@@ -1836,6 +1853,146 @@
removeMarkedTasks(trackerName);
}
}
+
+ public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
+ ((TaskCommitQueue)taskCommitThread).addToQueue(j);
+ }
+ //This thread takes care of things like moving outputs to their final
+ //locations & deleting temporary outputs
+ private class TaskCommitQueue extends Thread {
+
+ private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue =
+ new LinkedBlockingQueue <JobInProgress.JobWithTaskContext>();
+
+ public TaskCommitQueue() {
+ setName("Task Commit Thread");
+ setDaemon(true);
+ }
+
+ public void addToQueue(JobInProgress.JobWithTaskContext j) {
+ while (!queue.add(j)) {
+ LOG.warn("Couldn't add to the Task Commit queue now. Will " +
+ "try again");
+ try {
+ Thread.sleep(2000);
+ } catch (InterruptedException ie) {}
+ }
+ }
+
+ public void run() {
+ while (!isInterrupted()) {
+ JobInProgress.JobWithTaskContext j;
+ try {
+ j = queue.take();
+ } catch (InterruptedException ie) {
+ return;
+ }
+ JobInProgress job = j.getJob();
+ TaskInProgress tip = j.getTIP();
+ String taskid = j.getTaskId();
+ JobTrackerMetrics metrics = j.getJobTrackerMetrics();
+ Task t;
+ TaskStatus status;
+ boolean isTipComplete = false;
+ TaskStatus.State state;
+ synchronized (JobTracker.this) {
+ synchronized (job) {
+ synchronized (tip) {
+ status = tip.getTaskStatus(taskid);
+ t = tip.getTaskObject(taskid);
+ state = status.getRunState();
+ isTipComplete = tip.isComplete();
+ }
+ }
+ }
+ try {
+ //For COMMIT_PENDING tasks, we save the task output in the dfs
+ //as well as manipulate the JT datastructures to reflect a
+ //successful task. This guarantees that we don't declare a task
+ //as having succeeded until we have successfully completed the
+ //dfs operations.
+ //For failed tasks, we just do the dfs operations here. The
+ //datastructures updates is done earlier as soon as the failure
+ //is detected so that the JT can immediately schedule another
+ //attempt for that task.
+ if (state == TaskStatus.State.COMMIT_PENDING) {
+ if (!isTipComplete) {
+ t.saveTaskOutput();
+ }
+ synchronized (JobTracker.this) {
+ //do a check for the case where after the task went to
+ //COMMIT_PENDING, it was lost. So although we would have
+ //saved the task output, we cannot declare it a SUCCESS.
+ TaskStatus newStatus = null;
+ synchronized (job) {
+ synchronized (tip) {
+ status = tip.getTaskStatus(taskid);
+ if (!isTipComplete) {
+ if (status.getRunState() !=
+ TaskStatus.State.COMMIT_PENDING) {
+ state = TaskStatus.State.KILLED;
+ } else {
+ state = TaskStatus.State.SUCCEEDED;
+ }
+ } else {
+ tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
+ "TIP");
+ state = TaskStatus.State.KILLED;
+
+ }
+ //create new status if required. If the state changed from
+ //COMMIT_PENDING to KILLED in the JobTracker, while we were
+ //saving the output,the JT would have called updateTaskStatus
+ //and we don't need to call it again
+ if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
+ newStatus = TaskStatus.createTaskStatus(
+ tip.isMapTask(),
+ taskid,
+ state == TaskStatus.State.SUCCEEDED ? 1.0f : 0.0f,
+ state,
+ status.getDiagnosticInfo(),
+ status.getStateString(),
+ status.getTaskTracker(), status.getPhase(),
+ status.getCounters());
+ }
+ }
+ if (newStatus != null) {
+ job.updateTaskStatus(tip, newStatus, metrics);
+ }
+ }
+ }
+ }
+ } catch (IOException ioe) {
+ // Oops! Failed to copy the task's output to its final place;
+ // fail the task!
+ state = TaskStatus.State.FAILED;
+ synchronized (JobTracker.this) {
+ job.failedTask(tip, status.getTaskId(),
+ "Failed to rename output with the exception: " +
+ StringUtils.stringifyException(ioe),
+ (tip.isMapTask() ?
+ TaskStatus.Phase.MAP :
+ TaskStatus.Phase.REDUCE),
+ TaskStatus.State.FAILED,
+ status.getTaskTracker(), null);
+ }
+ LOG.info("Failed to rename the output of " + status.getTaskId() +
+ " with: " + StringUtils.stringifyException(ioe));
+ }
+ if (state == TaskStatus.State.FAILED ||
+ state == TaskStatus.State.KILLED) {
+ try {
+ t.discardTaskOutput();
+ } catch (IOException ioe) {
+ LOG.info("Failed to discard the output of task " +
+ status.getTaskId() + " with: " +
+ StringUtils.stringifyException(ioe));
+ }
+ }
+ }
+ }
+ }
+
/**
* Get the localized job file path on the job trackers local file system
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/Task.java Wed Oct 10 02:32:49 2007
@@ -438,21 +438,32 @@
private Path getFinalPath(Path jobOutputDir, Path taskOutput) {
URI relativePath = taskOutputPath.toUri().relativize(taskOutput.toUri());
- return new Path(jobOutputDir, relativePath.getPath());
+ if (relativePath.getPath().length() > 0) {
+ return new Path(jobOutputDir, relativePath.getPath());
+ } else {
+ return jobOutputDir;
+ }
}
private void moveTaskOutputs(FileSystem fs, Path jobOutputDir, Path taskOutput)
throws IOException {
if (fs.isFile(taskOutput)) {
Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
- fs.mkdirs(finalOutputPath.getParent());
if (!fs.rename(taskOutput, finalOutputPath)) {
- throw new IOException("Failed to save output of task: " +
- getTaskId());
+ if (!fs.delete(finalOutputPath)) {
+ throw new IOException("Failed to delete earlier output of task: " +
+ getTaskId());
+ }
+ if (!fs.rename(taskOutput, finalOutputPath)) {
+ throw new IOException("Failed to save output of task: " +
+ getTaskId());
+ }
}
LOG.debug("Moved " + taskOutput + " to " + finalOutputPath);
} else if(fs.isDirectory(taskOutput)) {
Path[] paths = fs.listPaths(taskOutput);
+ Path finalOutputPath = getFinalPath(jobOutputDir, taskOutput);
+ fs.mkdirs(finalOutputPath);
if (paths != null) {
for (Path path : paths) {
moveTaskOutputs(fs, jobOutputDir, path);
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Oct 10 02:32:49 2007
@@ -104,7 +104,6 @@
// Map from taskId -> Task
private Map<String, Task> tasks = new TreeMap<String, Task>();
- boolean savedTaskOutput = false;
private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
private TreeSet<String> tasksReportedClosed = new TreeSet<String>();
@@ -185,6 +184,15 @@
return partition;
}
+ public boolean isOnlyCommitPending() {
+ for (TaskStatus t : taskStatuses.values()) {
+ if (t.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ return true;
+ }
+ }
+ return false;
+ }
+
/**
* Initialization common to Map and Reduce
*/
@@ -219,6 +227,15 @@
}
/**
+ * Return the Task object associated with a taskId
+ * @param taskId
+ * @return
+ */
+ public Task getTaskObject(String taskId) {
+ return tasks.get(taskId);
+ }
+
+ /**
* Is this tip currently running any tasks?
* @return true if any tasks are running
*/
@@ -231,7 +248,7 @@
*
* @return <code>true</code> if the tip is complete, else <code>false</code>
*/
- public boolean isComplete() {
+ public synchronized boolean isComplete() {
return (completes > 0);
}
@@ -350,7 +367,7 @@
* @param taskId id of the task
* @param diagInfo diagnostic information for the task
*/
- private void addDiagnosticInfo(String taskId, String diagInfo) {
+ public void addDiagnosticInfo(String taskId, String diagInfo) {
List<String> diagHistory = taskDiagnosticData.get(taskId);
if (diagHistory == null) {
diagHistory = new ArrayList<String>();
@@ -396,7 +413,8 @@
if (newState == TaskStatus.State.RUNNING &&
(oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED ||
- oldState == TaskStatus.State.SUCCEEDED)) {
+ oldState == TaskStatus.State.SUCCEEDED ||
+ oldState == TaskStatus.State.COMMIT_PENDING)) {
return false;
}
@@ -419,10 +437,18 @@
//
// Note the failure and its location
//
- LOG.info("Task '" + taskid + "' has been lost.");
TaskStatus status = taskStatuses.get(taskid);
TaskStatus.State taskState = TaskStatus.State.FAILED;
if (status != null) {
+ // Check if the user manually KILLED/FAILED this task-attempt...
+ Boolean shouldFail = tasksToKill.remove(taskid);
+ if (shouldFail != null) {
+ taskState = (shouldFail) ? TaskStatus.State.FAILED :
+ TaskStatus.State.KILLED;
+ status.setRunState(taskState);
+ addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
+ }
+
taskState = status.getRunState();
if (taskState != TaskStatus.State.FAILED &&
taskState != TaskStatus.State.KILLED) {
@@ -441,24 +467,18 @@
this.activeTasks.remove(taskid);
// Since we do not fail completed reduces (whose outputs go to hdfs), we
- // should note this failure only for completed maps; however if the job
- // is done, there is no need to manipulate completed maps
- if (this.completes > 0 && this.isMapTask() &&
+ // should note this failure only for completed maps, only if this taskid;
+ // completed this map. however if the job is done, there is no need to
+ // manipulate completed maps
+ if (this.isMapTask() && isComplete(taskid) &&
jobStatus.getRunState() != JobStatus.SUCCEEDED) {
this.completes--;
}
- // Discard task output
- Task t = tasks.get(taskid);
- try {
- t.discardTaskOutput();
- } catch (IOException ioe) {
- LOG.info("Failed to discard output of task '" + taskid + "' with " +
- StringUtils.stringifyException(ioe));
- }
if (taskState == TaskStatus.State.FAILED) {
numTaskFailures++;
+ machinesWhereFailed.add(trackerName);
} else {
numKilledTasks++;
}
@@ -467,7 +487,6 @@
LOG.info("TaskInProgress " + getTIPId() + " has failed " + numTaskFailures + " times.");
kill();
}
- machinesWhereFailed.add(trackerName);
}
/**
@@ -490,14 +509,6 @@
* taskid as {@link TaskStatus.State.KILLED}.
*/
void alreadyCompletedTask(String taskid) {
- Task t = tasks.get(taskid);
- try {
- t.discardTaskOutput();
- } catch (IOException ioe) {
- LOG.info("Failed to discard output of task '" + taskid + "' with " +
- StringUtils.stringifyException(ioe));
- }
-
// 'KILL' the task
completedTask(taskid, TaskStatus.State.KILLED);
@@ -512,29 +523,11 @@
* Indicate that one of the taskids in this TaskInProgress
* has successfully completed!
*/
- public void completed(String taskid) throws IOException {
- //
- // Finalize the task's output
- //
- Task t = tasks.get(taskid);
- if (!savedTaskOutput) {
- t.saveTaskOutput();
- savedTaskOutput = true;
- } else {
- try {
- t.discardTaskOutput();
- } catch (IOException ioe) {
- LOG.info("Failed to discard 'already-saved' output of task: " +
- t.getTaskId() + " with: " +
- StringUtils.stringifyException(ioe));
- }
- }
-
+ public void completed(String taskid) {
//
// Record that this taskid is complete
//
completedTask(taskid, TaskStatus.State.SUCCEEDED);
-
//
// Now that the TIP is complete, the other speculative
@@ -545,7 +538,6 @@
this.completes++;
recomputeProgress();
- LOG.info("Task '" + taskid + "' has completed succesfully");
}
/**
@@ -588,7 +580,8 @@
*/
boolean killTask(String taskId, boolean shouldFail) {
TaskStatus st = taskStatuses.get(taskId);
- if(st != null && st.getRunState() == TaskStatus.State.RUNNING
+ if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
+ || st.getRunState() == TaskStatus.State.COMMIT_PENDING)
&& tasksToKill.put(taskId, shouldFail) == null ) {
String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
+ " task '" + taskId + "' by user";
@@ -599,32 +592,6 @@
return false;
}
- /** Notification that a task with the given id has been killed */
- void taskKilled(String taskId, String trackerName, JobStatus jobStatus) {
- Boolean shouldFail = tasksToKill.remove(taskId);
- if(shouldFail != null && !shouldFail) {
- LOG.info("Task '" + taskId + "' has been killed");
- this.activeTasks.remove(taskId);
- taskStatuses.get(taskId).setRunState(TaskStatus.State.KILLED );
- addDiagnosticInfo(taskId, "Task has been killed" );
- // Discard task output
- Task t = tasks.get(taskId);
- try {
- t.discardTaskOutput();
- } catch (IOException ioe) {
- LOG.info("Failed to discard output of task '" + taskId + "' with " +
- StringUtils.stringifyException(ioe));
- }
- numKilledTasks++;
-
- }
- else {
- //set the task status as failed.
- taskStatuses.get(taskId).setRunState(TaskStatus.State.FAILED);
- incompleteSubTask(taskId, trackerName, jobStatus);
- }
- }
-
/**
* This method is called whenever there's a status change
* for one of the TIP's sub-tasks. It recomputes the overall
@@ -650,6 +617,12 @@
bestState = status.getStateString();
bestCounters = status.getCounters();
break;
+ } else if (status.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ //for COMMIT_PENDING, we take the last state that we recorded
+ //when the task was RUNNING
+ bestProgress = this.progress;
+ bestState = this.state;
+ bestCounters = this.counters;
} else if (status.getRunState() == TaskStatus.State.RUNNING) {
if (status.getProgress() >= bestProgress) {
bestProgress = status.getProgress();
@@ -692,7 +665,7 @@
runSpeculative &&
(averageProgress - progress >= SPECULATIVE_GAP) &&
(System.currentTimeMillis() - startTime >= SPECULATIVE_LAG)
- && completes == 0) {
+ && completes == 0 && !isOnlyCommitPending()) {
return true;
}
return false;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskStatus.java Wed Oct 10 02:32:49 2007
@@ -40,7 +40,8 @@
public static enum Phase{STARTING, MAP, SHUFFLE, SORT, REDUCE}
// what state is the task in?
- public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED}
+ public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
+ COMMIT_PENDING}
private String taskid;
private float progress;
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Oct 10 02:32:49 2007
@@ -1368,7 +1368,7 @@
* The task is reporting that it's done running
*/
public synchronized void reportDone() {
- this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ this.taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
@@ -1400,7 +1400,7 @@
boolean needCleanup = false;
synchronized (this) {
if (done) {
- taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
} else {
if (!wasKilled) {
failures += 1;
@@ -1477,7 +1477,10 @@
*/
private synchronized void mapOutputLost(String failure
) throws IOException {
- if (taskStatus.getRunState() == TaskStatus.State.SUCCEEDED) {
+ //The check for COMMIT_PENDING should actually be a check for SUCCESS
+ //however for that, we have to introduce another Action type from the
+ //JT to the TT (SuccessTaskAction in the lines of KillTaskAction).
+ if (taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
// change status to failure
LOG.info("Reporting output lost:"+task.getTaskId());
taskStatus.setRunState(TaskStatus.State.FAILED);
Modified: lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp?rev=583408&r1=583407&r2=583408&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp (original)
+++ lucene/hadoop/trunk/src/webapps/job/taskdetails.jsp Wed Oct 10 02:32:49 2007
@@ -155,7 +155,7 @@
}
out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
+ "&tipid=" + tipid + "&taskid=" + status.getTaskId() + "\">"
- + status.getCounters().size() + "</a></td>");
+ + ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");
out.print("<td>");
if (privateActions
&& status.getRunState() == TaskStatus.State.RUNNING) {