You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2007/10/17 06:36:22 UTC
svn commit: r585366 - in /lucene/hadoop/trunk: CHANGES.txt
src/java/org/apache/hadoop/mapred/JobTracker.java
Author: ddas
Date: Tue Oct 16 21:36:22 2007
New Revision: 585366
URL: http://svn.apache.org/viewvc?rev=585366&view=rev
Log:
HADOOP-2051. The TaskCommit thread should not die for exceptions other than the InterruptedException. This behavior is there for the other long running threads in the JobTracker. Contributed by Arun C Murthy
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?rev=585366&r1=585365&r2=585366&view=diff
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Tue Oct 16 21:36:22 2007
@@ -310,6 +310,10 @@
HADOOP-2044. The namenode protects all lease manipulations using a
sortedLease lock. (Dhruba Borthakur)
+ HADOOP-2051. The TaskCommit thread should not die for exceptions other
+ than the InterruptedException. This behavior is there for the other long
+ running threads in the JobTracker. (Arun C Murthy via ddas)
+
IMPROVEMENTS
HADOOP-1908. Restructure data node code so that block sending and
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=585366&r1=585365&r2=585366&view=diff
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Tue Oct 16 21:36:22 2007
@@ -790,7 +790,6 @@
LOG.info("Stopping TaskCommit thread");
this.taskCommitThread.interrupt();
try {
- this.taskCommitThread.interrupt();
this.taskCommitThread.join();
} catch (InterruptedException ex) {
ex.printStackTrace();
@@ -1871,11 +1870,21 @@
}
}
- public void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
+ /**
+ * Add a job's completed task (either successful or failed/killed) to the
+ * {@link TaskCommitQueue}.
+ * @param j completed task (either successful or failed/killed)
+ */
+ void addToCommitQueue(JobInProgress.JobWithTaskContext j) {
((TaskCommitQueue)taskCommitThread).addToQueue(j);
}
- //This thread takes care of things like moving outputs to their final
- //locations & deleting temporary outputs
+
+ /**
+ * A thread which does all of the {@link FileSystem}-related operations for
+ * tasks. It picks the next task in the queue, promotes outputs of
+ * {@link TaskStatus.State#SUCCEEDED} tasks & discards outputs for
+ * {@link TaskStatus.State#FAILED} or {@link TaskStatus.State#KILLED} tasks.
+ */
private class TaskCommitQueue extends Thread {
private LinkedBlockingQueue<JobInProgress.JobWithTaskContext> queue =
@@ -1898,109 +1907,116 @@
public void run() {
while (!isInterrupted()) {
- JobInProgress.JobWithTaskContext j;
try {
- j = queue.take();
- } catch (InterruptedException ie) {
- return;
- }
- JobInProgress job = j.getJob();
- TaskInProgress tip = j.getTIP();
- String taskid = j.getTaskId();
- JobTrackerMetrics metrics = j.getJobTrackerMetrics();
- Task t;
- TaskStatus status;
- boolean isTipComplete = false;
- TaskStatus.State state;
- synchronized (JobTracker.this) {
- synchronized (job) {
- synchronized (tip) {
- status = tip.getTaskStatus(taskid);
- t = tip.getTaskObject(taskid);
- state = status.getRunState();
- isTipComplete = tip.isComplete();
+ JobInProgress.JobWithTaskContext j = queue.take();
+ JobInProgress job = j.getJob();
+ TaskInProgress tip = j.getTIP();
+ String taskid = j.getTaskId();
+ JobTrackerMetrics metrics = j.getJobTrackerMetrics();
+ Task t;
+ TaskStatus status;
+ boolean isTipComplete = false;
+ TaskStatus.State state;
+ synchronized (JobTracker.this) {
+ synchronized (job) {
+ synchronized (tip) {
+ status = tip.getTaskStatus(taskid);
+ t = tip.getTaskObject(taskid);
+ state = status.getRunState();
+ isTipComplete = tip.isComplete();
+ }
}
}
- }
- try {
- //For COMMIT_PENDING tasks, we save the task output in the dfs
- //as well as manipulate the JT datastructures to reflect a
- //successful task. This guarantees that we don't declare a task
- //as having succeeded until we have successfully completed the
- //dfs operations.
- //For failed tasks, we just do the dfs operations here. The
- //datastructures updates is done earlier as soon as the failure
- //is detected so that the JT can immediately schedule another
- //attempt for that task.
- if (state == TaskStatus.State.COMMIT_PENDING) {
- if (!isTipComplete) {
- t.saveTaskOutput();
- }
- synchronized (JobTracker.this) {
- //do a check for the case where after the task went to
- //COMMIT_PENDING, it was lost. So although we would have
- //saved the task output, we cannot declare it a SUCCESS.
- TaskStatus newStatus = null;
- synchronized (job) {
- synchronized (tip) {
- status = tip.getTaskStatus(taskid);
- if (!isTipComplete) {
- if (status.getRunState() !=
- TaskStatus.State.COMMIT_PENDING) {
- state = TaskStatus.State.KILLED;
+ try {
+ //For COMMIT_PENDING tasks, we save the task output in the dfs
+ //as well as manipulate the JT datastructures to reflect a
+ //successful task. This guarantees that we don't declare a task
+ //as having succeeded until we have successfully completed the
+ //dfs operations.
+ //For failed tasks, we just do the dfs operations here. The
+ //datastructures updates is done earlier as soon as the failure
+ //is detected so that the JT can immediately schedule another
+ //attempt for that task.
+ if (state == TaskStatus.State.COMMIT_PENDING) {
+ if (!isTipComplete) {
+ t.saveTaskOutput();
+ }
+ synchronized (JobTracker.this) {
+ //do a check for the case where after the task went to
+ //COMMIT_PENDING, it was lost. So although we would have
+ //saved the task output, we cannot declare it a SUCCESS.
+ TaskStatus newStatus = null;
+ synchronized (job) {
+ synchronized (tip) {
+ status = tip.getTaskStatus(taskid);
+ if (!isTipComplete) {
+ if (status.getRunState() !=
+ TaskStatus.State.COMMIT_PENDING) {
+ state = TaskStatus.State.KILLED;
+ } else {
+ state = TaskStatus.State.SUCCEEDED;
+ }
} else {
- state = TaskStatus.State.SUCCEEDED;
- }
- } else {
- tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
- "TIP");
- state = TaskStatus.State.KILLED;
+ tip.addDiagnosticInfo(t.getTaskId(),"Already completed " +
+ "TIP");
+ state = TaskStatus.State.KILLED;
+ }
+ //create new status if required. If the state changed from
+ //COMMIT_PENDING to KILLED in the JobTracker, while we were
+ //saving the output,the JT would have called updateTaskStatus
+ //and we don't need to call it again
+ if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
+ newStatus = (TaskStatus)status.clone();
+ newStatus.setRunState(state);
+ newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+ }
}
- //create new status if required. If the state changed from
- //COMMIT_PENDING to KILLED in the JobTracker, while we were
- //saving the output,the JT would have called updateTaskStatus
- //and we don't need to call it again
- if (status.getRunState() == TaskStatus.State.COMMIT_PENDING){
- newStatus = (TaskStatus)status.clone();
- newStatus.setRunState(state);
- newStatus.setProgress((state == TaskStatus.State.SUCCEEDED) ? 1.0f : 0.0f);
+ if (newStatus != null) {
+ job.updateTaskStatus(tip, newStatus, metrics);
}
}
- if (newStatus != null) {
- job.updateTaskStatus(tip, newStatus, metrics);
- }
}
}
+ } catch (IOException ioe) {
+ // Oops! Failed to copy the task's output to its final place;
+ // fail the task!
+ state = TaskStatus.State.FAILED;
+ synchronized (JobTracker.this) {
+ job.failedTask(tip, status.getTaskId(),
+ "Failed to rename output with the exception: " +
+ StringUtils.stringifyException(ioe),
+ (tip.isMapTask() ?
+ TaskStatus.Phase.MAP :
+ TaskStatus.Phase.REDUCE),
+ TaskStatus.State.FAILED,
+ status.getTaskTracker(), null);
+ }
+ LOG.info("Failed to rename the output of " + status.getTaskId() +
+ " with: " + StringUtils.stringifyException(ioe));
}
- } catch (IOException ioe) {
- // Oops! Failed to copy the task's output to its final place;
- // fail the task!
- state = TaskStatus.State.FAILED;
- synchronized (JobTracker.this) {
- job.failedTask(tip, status.getTaskId(),
- "Failed to rename output with the exception: " +
- StringUtils.stringifyException(ioe),
- (tip.isMapTask() ?
- TaskStatus.Phase.MAP :
- TaskStatus.Phase.REDUCE),
- TaskStatus.State.FAILED,
- status.getTaskTracker(), null);
+ if (state == TaskStatus.State.FAILED ||
+ state == TaskStatus.State.KILLED) {
+ try {
+ t.discardTaskOutput();
+ } catch (IOException ioe) {
+ LOG.info("Failed to discard the output of task " +
+ status.getTaskId() + " with: " +
+ StringUtils.stringifyException(ioe));
+ }
}
- LOG.info("Failed to rename the output of " + status.getTaskId() +
- " with: " + StringUtils.stringifyException(ioe));
+ } catch (InterruptedException ie) {
+ LOG.warn(getName() + " exiting, got interrupted: " +
+ StringUtils.stringifyException(ie));
+ return;
}
- if (state == TaskStatus.State.FAILED ||
- state == TaskStatus.State.KILLED) {
- try {
- t.discardTaskOutput();
- } catch (IOException ioe) {
- LOG.info("Failed to discard the output of task " +
- status.getTaskId() + " with: " +
- StringUtils.stringifyException(ioe));
- }
+ catch (Throwable t) {
+ LOG.error(getName() + " got an exception: " +
+ StringUtils.stringifyException(t));
}
}
+
+ LOG.warn(getName() + " exiting...");
}
}