You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:24:12 UTC
svn commit: r741192 [2/2] - in /hadoop/core/trunk: ./ src/mapred/
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/lib/output/
src/test/org/apache/hadoop/mapred/ src/webapps/job/
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb 5 17:24:11 2009
@@ -41,7 +41,7 @@
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
private final TaskAttemptID taskid;
private float progress;
@@ -204,6 +204,12 @@
}
this.phase = phase;
}
+
+ boolean inTaskCleanupPhase() {
+ return (this.phase == TaskStatus.Phase.CLEANUP &&
+ (this.runState == TaskStatus.State.FAILED_UNCLEAN ||
+ this.runState == TaskStatus.State.KILLED_UNCLEAN));
+ }
public boolean getIncludeCounters() {
return includeCounters;
@@ -261,9 +267,9 @@
/**
* Update the status of the task.
*
+ * @param runstate
* @param progress
* @param state
- * @param phase
* @param counters
*/
synchronized void statusUpdate(State runState,
@@ -300,7 +306,33 @@
this.counters = status.getCounters();
this.outputSize = status.outputSize;
}
-
+
+ /**
+ * Update specific fields of task status
+ *
+ * This update is done in JobTracker when a cleanup attempt of task
+ * reports its status. Then update only specific fields, not all.
+ *
+ * @param runState
+ * @param progress
+ * @param state
+ * @param phase
+ * @param finishTime
+ */
+ synchronized void statusUpdate(State runState,
+ float progress,
+ String state,
+ Phase phase,
+ long finishTime) {
+ setRunState(runState);
+ setProgress(progress);
+ setStateString(state);
+ setPhase(phase);
+ if (finishTime != 0) {
+ this.finishTime = finishTime;
+ }
+ }
+
/**
* Clear out transient information after sending out a status-update
* from either the {@link Task} to the {@link TaskTracker} or from the
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 5 17:24:11 2009
@@ -183,7 +183,8 @@
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
- private static final String PIDDIR = "pids";
+ private static final String PID = "pid";
+ private static final String OUTPUT = "output";
private JobConf originalConf;
private JobConf fConf;
private int maxCurrentMapTasks;
@@ -412,38 +413,63 @@
static String getJobCacheSubdir() {
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
+
+ static String getLocalJobDir(String jobid) {
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
+ }
- static String getPidFilesSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false) ;
}
-
+
+ static String getIntermediateOutputDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid)
+ + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ }
+
+ static String getLocalTaskDir(String jobid,
+ String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ taskDir = taskDir + ".cleanup";
+ }
+ return taskDir;
+ }
+
+ static String getPidFile(String jobid,
+ String taskid,
+ boolean isCleanup) {
+ return getLocalTaskDir(jobid, taskid, isCleanup)
+ + Path.SEPARATOR + PID;
+ }
+
/**
* Get the pidFile path of a Task
+ *
* @param tid the TaskAttemptID of the task for which pidFile's path is needed
+ * @param conf Configuration for local dir allocator
+ * @param isCleanup true if the task is cleanup attempt
+ *
* @return pidFile's Path
*/
- public static Path getPidFilePath(TaskAttemptID tid, JobConf conf) {
+ static Path getPidFilePath(TaskAttemptID tid,
+ JobConf conf,
+ boolean isCleanup) {
Path pidFileName = null;
try {
//this actually need not use a localdirAllocator since the PID
//files are really small..
pidFileName = lDirAlloc.getLocalPathToRead(
- (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tid),
- conf);
+ getPidFile(tid.getJobID().toString(), tid.toString(), isCleanup),
+ conf);
} catch (IOException i) {
// PID file is not there
- LOG.warn("Failed to get pidFile name for " + tid + " " + i);
+ LOG.warn("Failed to get pidFile name for " + tid + " " +
+ StringUtils.stringifyException(i));
}
return pidFileName;
}
- public void removePidFile(TaskAttemptID tid) {
- Path pidFilePath = getPidFilePath(tid, getJobConf());
- if (pidFilePath != null) {
- try {
- FileSystem.getLocal(getJobConf()).delete(pidFilePath, false);
- } catch(IOException ie) {}
- }
- }
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
@@ -785,9 +811,9 @@
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "job.xml"),
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
@@ -811,9 +837,9 @@
// create the 'work' directory
// job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "work"), fConf);
+ Path workDir = lDirAlloc.getLocalPathForWrite(
+ (getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
@@ -835,8 +861,7 @@
// Here we check for and we check five times the size of jarFileSize
// to accommodate for unjarring the jar file in work directory
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getJobCacheSubdir()
- + Path.SEPARATOR + jobId
+ getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1253,7 +1278,8 @@
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
@@ -1370,7 +1396,8 @@
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
if (tip.getRunState() == TaskStatus.State.RUNNING ||
- tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ tip.isCleaningup()) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
long jobTaskTimeout = tip.getTaskTimeout();
@@ -1424,8 +1451,7 @@
// task if the job is done/failed
if (!rjob.keepJobFiles){
directoryCleanupThread.addToQueue(getLocalFiles(fConf,
- SUBDIR + Path.SEPARATOR + JOBCACHE +
- Path.SEPARATOR + rjob.getJobID()));
+ getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
rjob.tasks.clear();
@@ -1684,7 +1710,9 @@
}
synchronized (tip) {
//to make sure that there is no kill task action for this
- if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+ if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+ tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+ tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
//got killed externally while still in the launcher queue
addFreeSlot();
continue;
@@ -1705,7 +1733,8 @@
private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
Task t = action.getTask();
- LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+ LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+ " task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
@@ -1727,10 +1756,6 @@
private void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.addTask(tip.getTask().getTaskID(),
- getVirtualMemoryForTask(tip.getJobConf()));
- }
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -1751,7 +1776,23 @@
}
}
}
-
+
+ void addToMemoryManager(TaskAttemptID attemptId,
+ JobConf conf,
+ String pidFile) {
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.addTask(attemptId,
+ getVirtualMemoryForTask(conf), pidFile);
+ }
+ }
+
+ void removeFromMemoryManager(TaskAttemptID attemptId) {
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(attemptId);
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -1838,10 +1879,12 @@
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
- TaskStatus.State.UNASSIGNED,
+ task.getState(),
diagnosticInfo.toString(),
"initializing",
getName(),
+ task.isTaskCleanupTask() ?
+ TaskStatus.Phase.CLEANUP :
task.isMapTask()? TaskStatus.Phase.MAP:
TaskStatus.Phase.SHUFFLE,
task.getCounters());
@@ -1851,9 +1894,10 @@
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
- lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() +
- Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
- task.getTaskID()), defaultJobConf );
+ lDirAlloc.getLocalPathForWrite(
+ TaskTracker.getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask()),
+ defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
@@ -1863,8 +1907,7 @@
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
+ TaskTracker.getLocalJobDir(task.getJobID().toString())
+ Path.SEPARATOR
+ "work", defaultJobConf).toString();
String link = localTaskDir.getParent().toString()
@@ -1875,11 +1918,10 @@
// create the working-directory of the task
Path cwd = lDirAlloc.getLocalPathForWrite(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR,
+ defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
@@ -1974,9 +2016,13 @@
* Kick off the task execution
*/
public synchronized void launchTask() throws IOException {
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+ this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ }
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1986,6 +2032,10 @@
}
}
+ boolean isCleaningup() {
+ return this.taskStatus.inTaskCleanupPhase();
+ }
+
/**
* The task is reporting its progress
*/
@@ -1993,10 +2043,14 @@
{
LOG.info(task.getTaskID() + " " + taskStatus.getProgress() +
"% " + taskStatus.getStateString());
-
+ // task will report its state as
+ // COMMIT_PENDING when it is waiting for commit response and
+ // when it is committing.
+ // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
if (this.done ||
(this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
- this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+ this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !isCleaningup())) {
//make sure we ignore progress messages after a task has
//invoked TaskUmbilicalProtocol.done() or if the task has been
//KILLED/FAILED
@@ -2047,7 +2101,16 @@
* The task is reporting that it's done running
*/
public synchronized void reportDone() {
- this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ if (isCleaningup()) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (this.taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.KILLED);
+ }
+ } else {
+ this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ }
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
@@ -2062,6 +2125,11 @@
return wasKilled;
}
+ void reportTaskFinished() {
+ taskFinished();
+ releaseSlot();
+ }
+
/**
* The task has actually finished running.
*/
@@ -2088,7 +2156,23 @@
if (!done) {
if (!wasKilled) {
failures += 1;
- taskStatus.setRunState(TaskStatus.State.FAILED);
+ /* State changes:
+ * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ */
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else if (task.isMapOrReduce() &&
+ taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+ taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+ } else {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ }
+ removeFromMemoryManager(task.getTaskID());
// call the script here for the failed tasks.
if (debugCommand != null) {
String taskStdout ="";
@@ -2114,9 +2198,10 @@
File workDir = null;
try {
workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
+ TaskTracker.getLocalTaskDir(
+ task.getJobID().toString(),
+ task.getTaskID().toString(),
+ task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
localJobConf). toString());
} catch (IOException e) {
@@ -2169,14 +2254,14 @@
LOG.warn("Exception in add diagnostics!");
}
}
- } else {
- taskStatus.setRunState(TaskStatus.State.KILLED);
}
taskStatus.setProgress(0.0f);
}
this.taskStatus.setFinishTime(System.currentTimeMillis());
needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED ||
- taskStatus.getRunState() == TaskStatus.State.KILLED);
+ taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED);
}
//
@@ -2286,7 +2371,8 @@
synchronized(this){
if (getRunState() == TaskStatus.State.RUNNING ||
getRunState() == TaskStatus.State.UNASSIGNED ||
- getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
kill(wasFailure);
}
}
@@ -2297,19 +2383,46 @@
/**
* Something went wrong and the task must be killed.
+ *
+ * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ * UNASSIGNED -> FAILED/KILLED
* @param wasFailure was it a failure (versus a kill request)?
*/
public synchronized void kill(boolean wasFailure) throws IOException {
+ /* State changes:
+ * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+ * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ * UNASSIGNED -> FAILED/KILLED
+ */
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
- taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
wasKilled = true;
if (wasFailure) {
failures += 1;
}
runner.kill();
- taskStatus.setRunState((wasFailure) ?
- TaskStatus.State.FAILED :
- TaskStatus.State.KILLED);
+ if (task.isMapOrReduce()) {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED_UNCLEAN :
+ TaskStatus.State.KILLED_UNCLEAN);
+ } else {
+ // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED :
+ TaskStatus.State.KILLED);
+ }
+ }
} else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
if (wasFailure) {
failures += 1;
@@ -2318,6 +2431,7 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ removeFromMemoryManager(task.getTaskID());
releaseSlot();
}
@@ -2369,7 +2483,12 @@
synchronized (TaskTracker.this) {
if (needCleanup) {
- tasks.remove(taskId);
+ // see if tasks data structure is holding this tip.
+ // tasks could hold the tip for cleanup attempt, if cleanup attempt
+ // got launched before this method.
+ if (tasks.get(taskId) == this) {
+ tasks.remove(taskId);
+ }
}
synchronized (this){
if (alwaysKeepTaskFiles ||
@@ -2381,8 +2500,8 @@
}
synchronized (this) {
try {
- String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
- + task.getJobID() + Path.SEPARATOR + taskId;
+ String taskDir = getLocalTaskDir(task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2603,15 +2722,10 @@
}
if (tip != null) {
if (!commitPending) {
- tip.taskFinished();
- // Remove the entry from taskMemoryManagerThread's data structures.
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.removeTask(taskid);
- }
- tip.releaseSlot();
+ tip.reportTaskFinished();
}
} else {
- LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+ LOG.warn("Unknown child task finished: "+taskid+". Ignored.");
}
}
@@ -2838,15 +2952,13 @@
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out.index", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out.index", conf);
// Map-output file
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out", conf);
/**
* Read the index file to get the information about where
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb 5 17:24:11 2009
@@ -250,7 +250,8 @@
TaskStatus.State state = ts.getRunState();
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
mapCount++;
}
}
@@ -267,7 +268,8 @@
TaskStatus.State state = ts.getRunState();
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
reduceCount++;
}
}
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb 5 17:24:11 2009
@@ -52,9 +52,10 @@
* encapsulates the events and whether to reset events index.
* Version 13 changed the getTask method signature for HADOOP-249
* Version 14 changed the getTask method signature for HADOOP-4232
+ * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
* */
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
/**
* Called when a child task process starts, to get its task.
Modified: hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java (original)
+++ hadoop/core/trunk/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java Thu Feb 5 17:24:11 2009
@@ -174,8 +174,10 @@
@Override
public void abortTask(TaskAttemptContext context) {
try {
- context.progress();
- outputFileSystem.delete(workPath, true);
+ if (workPath != null) {
+ context.progress();
+ outputFileSystem.delete(workPath, true);
+ }
} catch (IOException ie) {
LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
}
Modified: hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java (original)
+++ hadoop/core/trunk/src/test/org/apache/hadoop/mapred/TestKillSubProcesses.java Thu Feb 5 17:24:11 2009
@@ -110,7 +110,7 @@
JobConf confForThisTask = new JobConf(conf);
confForThisTask.set("mapred.local.dir", localDir);//set the localDir
- Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+ Path pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
while (pidFilePath == null) {
//wait till the pid file is created
try {
@@ -119,7 +119,7 @@
LOG.warn("sleep is interrupted:" + ie);
break;
}
- pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask);
+ pidFilePath = TaskTracker.getPidFilePath(id, confForThisTask, false);
}
pid = ProcessTree.getPidFromPidFile(pidFilePath.toString());
Modified: hadoop/core/trunk/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/webapps/job/taskdetails.jsp?rev=741192&r1=741191&r2=741192&view=diff
==============================================================================
--- hadoop/core/trunk/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/trunk/src/webapps/job/taskdetails.jsp Thu Feb 5 17:24:11 2009
@@ -67,13 +67,19 @@
}
}
}
- TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
- : null;
+ TaskInProgress tip = null;
+ if (job != null && tipidObj != null) {
+ tip = job.getTaskInProgress(tipidObj);
+ }
+ TaskStatus[] ts = null;
+ if (tip != null) {
+ ts = tip.getTaskStatuses();
+ }
boolean isCleanupOrSetup = false;
- if (tipidObj != null) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+ if ( tip != null) {
+ isCleanupOrSetup = tip.isJobCleanupTask();
if (!isCleanupOrSetup) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+ isCleanupOrSetup = tip.isJobSetupTask();
}
}
%>
@@ -115,14 +121,41 @@
TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
out.print("<tr><td>" + status.getTaskID() + "</td>");
String taskAttemptTracker = null;
+ String cleanupTrackerName = null;
+ TaskTrackerStatus cleanupTracker = null;
+ String cleanupAttemptTracker = null;
+ boolean hasCleanupAttempt = false;
+ if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+ cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+ cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+ if (cleanupTracker != null) {
+ cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+ + cleanupTracker.getHttpPort();
+ }
+ hasCleanupAttempt = true;
+ }
+ out.print("<td>");
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: ");
+ }
if (taskTracker == null) {
- out.print("<td>" + taskTrackerName + "</td>");
+ out.print(taskTrackerName);
} else {
taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
+ taskTracker.getHttpPort();
- out.print("<td><a href=\"" + taskAttemptTracker + "\">"
- + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+ out.print("<a href=\"" + taskAttemptTracker + "\">"
+ + tracker.getNode(taskTracker.getHost()) + "</a>");
+ }
+ if (hasCleanupAttempt) {
+ out.print("<br/>Cleanup Attempt: ");
+ if (cleanupAttemptTracker == null ) {
+ out.print(cleanupTrackerName);
+ } else {
+ out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+ + tracker.getNode(cleanupTracker.getHost()) + "</a>");
}
+ }
+ out.print("</td>");
out.print("<td>" + status.getRunState() + "</td>");
out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
+ ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
String.valueOf(taskTracker.getHttpPort()),
status.getTaskID().toString());
}
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: <br/>");
+ }
if (taskLogUrl == null) {
out.print("n/a");
} else {
@@ -172,6 +208,25 @@
out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
}
+ if (hasCleanupAttempt) {
+ out.print("Cleanup attempt: <br/>");
+ taskLogUrl = null;
+ if (cleanupTracker != null ) {
+ taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+ String.valueOf(cleanupTracker.getHttpPort()),
+ status.getTaskID().toString());
+ }
+ if (taskLogUrl == null) {
+ out.print("n/a");
+ } else {
+ String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+ String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+ String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+ out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+ out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+ out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+ }
+ }
out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
+ "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
+ ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");