You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:34:11 UTC
svn commit: r741197 [2/2] - in /hadoop/core/branches/branch-0.19: ./
src/mapred/org/apache/hadoop/mapred/ src/test/org/apache/hadoop/mapred/
src/webapps/job/
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskStatus.java Thu Feb 5 17:34:10 2009
@@ -41,7 +41,7 @@
// what state is the task in?
public static enum State {RUNNING, SUCCEEDED, FAILED, UNASSIGNED, KILLED,
- COMMIT_PENDING}
+ COMMIT_PENDING, FAILED_UNCLEAN, KILLED_UNCLEAN}
private TaskAttemptID taskid;
private float progress;
@@ -202,6 +202,12 @@
}
this.phase = phase;
}
+
+ boolean inTaskCleanupPhase() {
+ return (this.phase == TaskStatus.Phase.CLEANUP &&
+ (this.runState == TaskStatus.State.FAILED_UNCLEAN ||
+ this.runState == TaskStatus.State.KILLED_UNCLEAN));
+ }
public boolean getIncludeCounters() {
return includeCounters;
@@ -259,9 +265,9 @@
/**
* Update the status of the task.
*
+ * @param runstate
* @param progress
* @param state
- * @param phase
* @param counters
*/
synchronized void statusUpdate(State runState,
@@ -298,7 +304,33 @@
this.counters = status.getCounters();
this.outputSize = status.outputSize;
}
-
+
+ /**
+ * Update specific fields of task status
+ *
+ * This update is done in JobTracker when a cleanup attempt of task
+ * reports its status. Then update only specific fields, not all.
+ *
+ * @param runState
+ * @param progress
+ * @param state
+ * @param phase
+ * @param finishTime
+ */
+ synchronized void statusUpdate(State runState,
+ float progress,
+ String state,
+ Phase phase,
+ long finishTime) {
+ setRunState(runState);
+ setProgress(progress);
+ setStateString(state);
+ setPhase(phase);
+ if (finishTime != 0) {
+ this.finishTime = finishTime;
+ }
+ }
+
/**
* Clear out transient information after sending out a status-update
* from either the {@link Task} to the {@link TaskTracker} or from the
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Thu Feb 5 17:34:10 2009
@@ -181,7 +181,8 @@
private static final String SUBDIR = "taskTracker";
private static final String CACHEDIR = "archive";
private static final String JOBCACHE = "jobcache";
- private static final String PIDDIR = "pids";
+ private static final String PID = "pid";
+ private static final String OUTPUT = "output";
private JobConf originalConf;
private JobConf fConf;
private int maxCurrentMapTasks;
@@ -358,10 +359,36 @@
return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.JOBCACHE;
}
- static String getPidFilesSubdir() {
- return TaskTracker.SUBDIR + Path.SEPARATOR + TaskTracker.PIDDIR;
+ static String getLocalJobDir(String jobid) {
+ return getJobCacheSubdir() + Path.SEPARATOR + jobid;
}
-
+
+ static String getLocalTaskDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid, false) ;
+ }
+
+ static String getIntermediateOutputDir(String jobid, String taskid) {
+ return getLocalTaskDir(jobid, taskid)
+ + Path.SEPARATOR + TaskTracker.OUTPUT ;
+ }
+
+ static String getLocalTaskDir(String jobid,
+ String taskid,
+ boolean isCleanupAttempt) {
+ String taskDir = getLocalJobDir(jobid) + Path.SEPARATOR + taskid;
+ if (isCleanupAttempt) {
+ taskDir = taskDir + ".cleanup";
+ }
+ return taskDir;
+ }
+
+ static String getPidFile(String jobid,
+ String taskid,
+ boolean isCleanup) {
+ return getLocalTaskDir(jobid, taskid, isCleanup)
+ + Path.SEPARATOR + PID;
+ }
+
public long getProtocolVersion(String protocol,
long clientVersion) throws IOException {
if (protocol.equals(TaskUmbilicalProtocol.class.getName())) {
@@ -699,9 +726,9 @@
} catch(FileNotFoundException fe) {
jobFileSize = -1;
}
- Path localJobFile = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "job.xml"),
+ Path localJobFile = lDirAlloc.getLocalPathForWrite(
+ getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "job.xml",
jobFileSize, fConf);
RunningJob rjob = addTaskToJob(jobId, tip);
synchronized (rjob) {
@@ -725,9 +752,9 @@
// create the 'work' directory
// job-specific shared directory for use as scratch space
- Path workDir = lDirAlloc.getLocalPathForWrite((getJobCacheSubdir()
- + Path.SEPARATOR + jobId
- + Path.SEPARATOR + "work"), fConf);
+ Path workDir = lDirAlloc.getLocalPathForWrite(
+ (getLocalJobDir(jobId.toString())
+ + Path.SEPARATOR + "work"), fConf);
if (!localFs.mkdirs(workDir)) {
throw new IOException("Mkdirs failed to create "
+ workDir.toString());
@@ -749,8 +776,7 @@
// Here we check for and we check five times the size of jarFileSize
// to accommodate for unjarring the jar file in work directory
localJarFile = new Path(lDirAlloc.getLocalPathForWrite(
- getJobCacheSubdir()
- + Path.SEPARATOR + jobId
+ getLocalJobDir(jobId.toString())
+ Path.SEPARATOR + "jars",
5 * jarFileSize, fConf), "job.jar");
if (!localFs.mkdirs(localJarFile.getParent())) {
@@ -1164,7 +1190,8 @@
for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED &&
- taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !taskStatus.inTaskCleanupPhase()) {
if (taskStatus.getIsMap()) {
mapTotal--;
} else {
@@ -1281,7 +1308,8 @@
long now = System.currentTimeMillis();
for (TaskInProgress tip: runningTasks.values()) {
if (tip.getRunState() == TaskStatus.State.RUNNING ||
- tip.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ tip.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ tip.isCleaningup()) {
// Check the per-job timeout interval for tasks;
// an interval of '0' implies it is never timed-out
long jobTaskTimeout = tip.getTaskTimeout();
@@ -1335,8 +1363,7 @@
// task if the job is done/failed
if (!rjob.keepJobFiles){
directoryCleanupThread.addToQueue(getLocalFiles(fConf,
- SUBDIR + Path.SEPARATOR + JOBCACHE +
- Path.SEPARATOR + rjob.getJobID()));
+ getLocalJobDir(rjob.getJobID().toString())));
}
// Remove this job
rjob.tasks.clear();
@@ -1581,7 +1608,9 @@
}
synchronized (tip) {
//to make sure that there is no kill task action for this
- if (tip.getRunState() != TaskStatus.State.UNASSIGNED) {
+ if (tip.getRunState() != TaskStatus.State.UNASSIGNED &&
+ tip.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+ tip.getRunState() != TaskStatus.State.KILLED_UNCLEAN) {
//got killed externally while still in the launcher queue
addFreeSlot();
continue;
@@ -1602,7 +1631,8 @@
private TaskInProgress registerTask(LaunchTaskAction action,
TaskLauncher launcher) {
Task t = action.getTask();
- LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID());
+ LOG.info("LaunchTaskAction (registerTask): " + t.getTaskID() +
+ " task's state:" + t.getState());
TaskInProgress tip = new TaskInProgress(t, this.fConf, launcher);
synchronized (this) {
tasks.put(t.getTaskID(), tip);
@@ -1624,10 +1654,6 @@
private void startNewTask(TaskInProgress tip) {
try {
localizeJob(tip);
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.addTask(tip.getTask().getTaskID(),
- getMemoryForTask(tip.getJobConf()));
- }
} catch (Throwable e) {
String msg = ("Error initializing " + tip.getTask().getTaskID() +
":\n" + StringUtils.stringifyException(e));
@@ -1648,7 +1674,23 @@
}
}
}
-
+
+ void addToMemoryManager(TaskAttemptID attemptId,
+ JobConf conf,
+ String pidFile) {
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.addTask(attemptId,
+ getMemoryForTask(conf), pidFile);
+ }
+ }
+
+ void removeFromMemoryManager(TaskAttemptID attemptId) {
+ // Remove the entry from taskMemoryManagerThread's data structures.
+ if (isTaskMemoryManagerEnabled()) {
+ taskMemoryManager.removeTask(attemptId);
+ }
+ }
+
/**
* The server retry loop.
* This while-loop attempts to connect to the JobTracker. It only
@@ -1735,10 +1777,12 @@
localJobConf = null;
taskStatus = TaskStatus.createTaskStatus(task.isMapTask(), task.getTaskID(),
0.0f,
- TaskStatus.State.UNASSIGNED,
+ task.getState(),
diagnosticInfo.toString(),
"initializing",
getName(),
+ task.isTaskCleanupTask() ?
+ TaskStatus.Phase.CLEANUP :
task.isMapTask()? TaskStatus.Phase.MAP:
TaskStatus.Phase.SHUFFLE,
task.getCounters());
@@ -1748,9 +1792,10 @@
private void localizeTask(Task task) throws IOException{
Path localTaskDir =
- lDirAlloc.getLocalPathForWrite((TaskTracker.getJobCacheSubdir() +
- Path.SEPARATOR + task.getJobID() + Path.SEPARATOR +
- task.getTaskID()), defaultJobConf );
+ lDirAlloc.getLocalPathForWrite(
+ TaskTracker.getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask()),
+ defaultJobConf );
FileSystem localFs = FileSystem.getLocal(fConf);
if (!localFs.mkdirs(localTaskDir)) {
@@ -1760,8 +1805,7 @@
// create symlink for ../work if it already doesnt exist
String workDir = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
+ TaskTracker.getLocalJobDir(task.getJobID().toString())
+ Path.SEPARATOR
+ "work", defaultJobConf).toString();
String link = localTaskDir.getParent().toString()
@@ -1772,11 +1816,10 @@
// create the working-directory of the task
Path cwd = lDirAlloc.getLocalPathForWrite(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
- + Path.SEPARATOR + MRConstants.WORKDIR,
- defaultJobConf);
+ getLocalTaskDir(task.getJobID().toString(),
+ task.getTaskID().toString(), task.isTaskCleanupTask())
+ + Path.SEPARATOR + MRConstants.WORKDIR,
+ defaultJobConf);
if (!localFs.mkdirs(cwd)) {
throw new IOException("Mkdirs failed to create "
+ cwd.toString());
@@ -1870,9 +1913,13 @@
* Kick off the task execution
*/
public synchronized void launchTask() throws IOException {
- if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED ||
+ this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ this.taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
localizeTask(task);
- this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ if (this.taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
+ this.taskStatus.setRunState(TaskStatus.State.RUNNING);
+ }
this.runner = task.createRunner(TaskTracker.this, this);
this.runner.start();
this.taskStatus.setStartTime(System.currentTimeMillis());
@@ -1882,6 +1929,10 @@
}
}
+ boolean isCleaningup() {
+ return this.taskStatus.inTaskCleanupPhase();
+ }
+
/**
* The task is reporting its progress
*/
@@ -1889,10 +1940,14 @@
{
LOG.info(task.getTaskID() + " " + taskStatus.getProgress() +
"% " + taskStatus.getStateString());
-
+ // task will report its state as
+ // COMMIT_PENDING when it is waiting for commit response and
+ // when it is committing.
+ // cleanup attempt will report its state as FAILED_UNCLEAN/KILLED_UNCLEAN
if (this.done ||
(this.taskStatus.getRunState() != TaskStatus.State.RUNNING &&
- this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING)) {
+ this.taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ !isCleaningup())) {
//make sure we ignore progress messages after a task has
//invoked TaskUmbilicalProtocol.done() or if the task has been
//KILLED/FAILED
@@ -1943,7 +1998,16 @@
* The task is reporting that it's done running
*/
public synchronized void reportDone() {
- this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ if (isCleaningup()) {
+ if (this.taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (this.taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ this.taskStatus.setRunState(TaskStatus.State.KILLED);
+ }
+ } else {
+ this.taskStatus.setRunState(TaskStatus.State.SUCCEEDED);
+ }
this.taskStatus.setProgress(1.0f);
this.taskStatus.setFinishTime(System.currentTimeMillis());
this.done = true;
@@ -1958,6 +2022,11 @@
return wasKilled;
}
+ void reportTaskFinished() {
+ taskFinished();
+ releaseSlot();
+ }
+
/**
* The task has actually finished running.
*/
@@ -1984,7 +2053,23 @@
if (!done) {
if (!wasKilled) {
failures += 1;
- taskStatus.setRunState(TaskStatus.State.FAILED);
+ /* State changes:
+ * RUNNING/COMMIT_PENDING -> FAILED_UNCLEAN/FAILED
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ */
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else if (task.isMapOrReduce() &&
+ taskStatus.getPhase() != TaskStatus.Phase.CLEANUP) {
+ taskStatus.setRunState(TaskStatus.State.FAILED_UNCLEAN);
+ } else {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ }
+ removeFromMemoryManager(task.getTaskID());
// call the script here for the failed tasks.
if (debugCommand != null) {
String taskStdout ="";
@@ -2010,9 +2095,10 @@
File workDir = null;
try {
workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + task.getJobID()
- + Path.SEPARATOR + task.getTaskID()
+ TaskTracker.getLocalTaskDir(
+ task.getJobID().toString(),
+ task.getTaskID().toString(),
+ task.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
localJobConf). toString());
} catch (IOException e) {
@@ -2065,14 +2151,14 @@
LOG.warn("Exception in add diagnostics!");
}
}
- } else {
- taskStatus.setRunState(TaskStatus.State.KILLED);
}
taskStatus.setProgress(0.0f);
}
this.taskStatus.setFinishTime(System.currentTimeMillis());
needCleanup = (taskStatus.getRunState() == TaskStatus.State.FAILED ||
- taskStatus.getRunState() == TaskStatus.State.KILLED);
+ taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED_UNCLEAN ||
+ taskStatus.getRunState() == TaskStatus.State.KILLED);
}
//
@@ -2182,7 +2268,8 @@
synchronized(this){
if (getRunState() == TaskStatus.State.RUNNING ||
getRunState() == TaskStatus.State.UNASSIGNED ||
- getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
kill(wasFailure);
}
}
@@ -2196,16 +2283,38 @@
* @param wasFailure was it a failure (versus a kill request)?
*/
public synchronized void kill(boolean wasFailure) throws IOException {
+ /* State changes:
+ * RUNNING -> FAILED_UNCLEAN/KILLED_UNCLEAN/FAILED/KILLED
+ * COMMIT_PENDING -> FAILED_UNCLEAN/KILLED_UNCLEAN
+ * FAILED_UNCLEAN -> FAILED
+ * KILLED_UNCLEAN -> KILLED
+ * UNASSIGNED -> FAILED/KILLED
+ */
if (taskStatus.getRunState() == TaskStatus.State.RUNNING ||
- taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING) {
+ taskStatus.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ isCleaningup()) {
wasKilled = true;
if (wasFailure) {
failures += 1;
}
runner.kill();
- taskStatus.setRunState((wasFailure) ?
- TaskStatus.State.FAILED :
- TaskStatus.State.KILLED);
+ if (task.isMapOrReduce()) {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED_UNCLEAN :
+ TaskStatus.State.KILLED_UNCLEAN);
+ } else {
+ // go FAILED_UNCLEAN -> FAILED and KILLED_UNCLEAN -> KILLED always
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.FAILED);
+ } else if (taskStatus.getRunState() ==
+ TaskStatus.State.KILLED_UNCLEAN) {
+ taskStatus.setRunState(TaskStatus.State.KILLED);
+ } else {
+ taskStatus.setRunState((wasFailure) ?
+ TaskStatus.State.FAILED :
+ TaskStatus.State.KILLED);
+ }
+ }
} else if (taskStatus.getRunState() == TaskStatus.State.UNASSIGNED) {
if (wasFailure) {
failures += 1;
@@ -2214,6 +2323,7 @@
taskStatus.setRunState(TaskStatus.State.KILLED);
}
}
+ removeFromMemoryManager(task.getTaskID());
releaseSlot();
}
@@ -2265,7 +2375,12 @@
synchronized (TaskTracker.this) {
if (needCleanup) {
- tasks.remove(taskId);
+ // see if tasks data structure is holding this tip.
+ // tasks could hold the tip for cleanup attempt, if cleanup attempt
+ // got launched before this method.
+ if (tasks.get(taskId) == this) {
+ tasks.remove(taskId);
+ }
}
synchronized (this){
if (alwaysKeepTaskFiles ||
@@ -2277,8 +2392,8 @@
}
synchronized (this) {
try {
- String taskDir = SUBDIR + Path.SEPARATOR + JOBCACHE + Path.SEPARATOR
- + task.getJobID() + Path.SEPARATOR + taskId;
+ String taskDir = getLocalTaskDir(task.getJobID().toString(),
+ taskId.toString(), task.isTaskCleanupTask());
if (needCleanup) {
if (runner != null) {
//cleans up the output directory of the task (where map outputs
@@ -2415,7 +2530,7 @@
throws IOException {
LOG.info("Task " + taskid + " is in COMMIT_PENDING");
statusUpdate(taskid, taskStatus);
- reportTaskFinished(taskid, true);
+ reportTaskFinished();
}
/**
@@ -2490,31 +2605,14 @@
// Called by TaskTracker thread after task process ends
/////////////////////////////////////////////////////
/**
- * The task is no longer running. It may not have completed successfully
+ * when you see report task finished, wake up the heartbeat
*/
- void reportTaskFinished(TaskAttemptID taskid, boolean commitPending) {
- TaskInProgress tip;
- synchronized (this) {
- tip = tasks.get(taskid);
- }
- if (tip != null) {
- if (!commitPending) {
- tip.taskFinished();
- // Remove the entry from taskMemoryManagerThread's data structures.
- if (isTaskMemoryManagerEnabled()) {
- taskMemoryManager.removeTask(taskid);
- }
- tip.releaseSlot();
- }
- synchronized(finishedCount) {
- finishedCount[0]++;
- finishedCount.notify();
- }
- } else {
- LOG.warn("Unknown child task finshed: "+taskid+". Ignored.");
+ void reportTaskFinished() {
+ synchronized(finishedCount) {
+ finishedCount[0]++;
+ finishedCount.notify();
}
}
-
/**
* A completed map task's output has been lost.
@@ -2740,15 +2838,13 @@
// Index file
Path indexFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out.index", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out.index", conf);
// Map-output file
Path mapOutputFileName = lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir() + Path.SEPARATOR +
- jobId + Path.SEPARATOR +
- mapId + "/output" + "/file.out", conf);
+ TaskTracker.getIntermediateOutputDir(jobId, mapId)
+ + "/file.out", conf);
/**
* Read the index file to get the information about where
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java Thu Feb 5 17:34:10 2009
@@ -206,7 +206,8 @@
TaskStatus.State state = ts.getRunState();
if (ts.getIsMap() &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
mapCount++;
}
}
@@ -223,7 +224,8 @@
TaskStatus.State state = ts.getRunState();
if ((!ts.getIsMap()) &&
((state == TaskStatus.State.RUNNING) ||
- (state == TaskStatus.State.UNASSIGNED))) {
+ (state == TaskStatus.State.UNASSIGNED) ||
+ ts.inTaskCleanupPhase())) {
reduceCount++;
}
}
Modified: hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java (original)
+++ hadoop/core/branches/branch-0.19/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java Thu Feb 5 17:34:10 2009
@@ -52,9 +52,10 @@
* encapsulates the events and whether to reset events index.
* Version 13 changed the getTask method signature for HADOOP-249
* Version 14 changed the getTask method signature for HADOOP-4232
+ * Version 15 Adds FAILED_UNCLEAN and KILLED_UNCLEAN states for HADOOP-4759
* */
- public static final long versionID = 14L;
+ public static final long versionID = 15L;
/**
* Called when a child task process starts, to get its task.
Added: hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java?rev=741197&view=auto
==============================================================================
--- hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java (added)
+++ hadoop/core/branches/branch-0.19/src/test/org/apache/hadoop/mapred/TestTaskFail.java Thu Feb 5 17:34:10 2009
@@ -0,0 +1,137 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import junit.framework.TestCase;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.lib.IdentityReducer;
+
+public class TestTaskFail extends TestCase {
+ public static class MapperClass extends MapReduceBase
+ implements Mapper<LongWritable, Text, Text, IntWritable> {
+ String taskid;
+ public void configure(JobConf job) {
+ taskid = job.get("mapred.task.id");
+ }
+ public void map (LongWritable key, Text value,
+ OutputCollector<Text, IntWritable> output,
+ Reporter reporter) throws IOException {
+ if (taskid.endsWith("_0")) {
+ throw new IOException();
+ } else if (taskid.endsWith("_1")) {
+ System.exit(-1);
+ }
+ }
+ }
+
+ public RunningJob launchJob(JobConf conf,
+ Path inDir,
+ Path outDir,
+ String input)
+ throws IOException {
+ // set up the input file system and write input text.
+ FileSystem inFs = inDir.getFileSystem(conf);
+ FileSystem outFs = outDir.getFileSystem(conf);
+ outFs.delete(outDir, true);
+ if (!inFs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ {
+ // write input into input file
+ DataOutputStream file = inFs.create(new Path(inDir, "part-0"));
+ file.writeBytes(input);
+ file.close();
+ }
+
+ // configure the mapred Job
+ conf.setMapperClass(MapperClass.class);
+ conf.setReducerClass(IdentityReducer.class);
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ String TEST_ROOT_DIR = new Path(System.getProperty("test.build.data",
+ "/tmp")).toString().replace(' ', '+');
+ conf.set("test.build.data", TEST_ROOT_DIR);
+ // return the RunningJob handle.
+ return new JobClient(conf).submitJob(conf);
+ }
+
+ public void testWithDFS() throws IOException {
+ MiniDFSCluster dfs = null;
+ MiniMRCluster mr = null;
+ FileSystem fileSys = null;
+ try {
+ final int taskTrackers = 4;
+
+ Configuration conf = new Configuration();
+ dfs = new MiniDFSCluster(conf, 4, true, null);
+ fileSys = dfs.getFileSystem();
+ mr = new MiniMRCluster(taskTrackers, fileSys.getUri().toString(), 1);
+ JobConf jobConf = mr.createJobConf();
+ final Path inDir = new Path("./input");
+ final Path outDir = new Path("./output");
+ String input = "The quick brown fox\nhas many silly\nred fox sox\n";
+ RunningJob job = null;
+
+ job = launchJob(jobConf, inDir, outDir, input);
+ // wait for the job to finish.
+ while (!job.isComplete());
+ assertEquals(JobStatus.SUCCEEDED, job.getJobState());
+
+ JobID jobId = job.getID();
+ // construct the task id of first map task
+ TaskAttemptID attemptId =
+ new TaskAttemptID(new TaskID(jobId, true, 0), 0);
+ TaskInProgress tip = mr.getJobTrackerRunner().getJobTracker().
+ getTip(attemptId.getTaskID());
+ // this should not be cleanup attempt since the first attempt
+ // fails with an exception
+ assertTrue(!tip.isCleanupAttempt(attemptId));
+ TaskStatus ts =
+ mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+ attemptId = new TaskAttemptID(new TaskID(jobId, true, 0), 1);
+ // this should be cleanup attempt since the second attempt fails
+ // with System.exit
+ assertTrue(tip.isCleanupAttempt(attemptId));
+ ts = mr.getJobTrackerRunner().getJobTracker().getTaskStatus(attemptId);
+ assertTrue(ts != null);
+ assertEquals(TaskStatus.State.FAILED, ts.getRunState());
+
+ } finally {
+ if (dfs != null) { dfs.shutdown(); }
+ if (mr != null) { mr.shutdown(); }
+ }
+ }
+
+ public static void main(String[] argv) throws Exception {
+ TestTaskFail td = new TestTaskFail();
+ td.testWithDFS();
+ }
+}
Modified: hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp?rev=741197&r1=741196&r2=741197&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp (original)
+++ hadoop/core/branches/branch-0.19/src/webapps/job/taskdetails.jsp Thu Feb 5 17:34:10 2009
@@ -67,13 +67,19 @@
}
}
}
- TaskStatus[] ts = (job != null) ? tracker.getTaskStatuses(tipidObj)
- : null;
+ TaskInProgress tip = null;
+ if (job != null && tipidObj != null) {
+ tip = job.getTaskInProgress(tipidObj);
+ }
+ TaskStatus[] ts = null;
+ if (tip != null) {
+ ts = tip.getTaskStatuses();
+ }
boolean isCleanupOrSetup = false;
- if (tipidObj != null) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isCleanupTask();
+ if ( tip != null) {
+ isCleanupOrSetup = tip.isJobCleanupTask();
if (!isCleanupOrSetup) {
- isCleanupOrSetup = job.getTaskInProgress(tipidObj).isSetupTask();
+ isCleanupOrSetup = tip.isJobSetupTask();
}
}
%>
@@ -115,14 +121,41 @@
TaskTrackerStatus taskTracker = tracker.getTaskTracker(taskTrackerName);
out.print("<tr><td>" + status.getTaskID() + "</td>");
String taskAttemptTracker = null;
+ String cleanupTrackerName = null;
+ TaskTrackerStatus cleanupTracker = null;
+ String cleanupAttemptTracker = null;
+ boolean hasCleanupAttempt = false;
+ if (tip != null && tip.isCleanupAttempt(status.getTaskID())) {
+ cleanupTrackerName = tip.machineWhereCleanupRan(status.getTaskID());
+ cleanupTracker = tracker.getTaskTracker(cleanupTrackerName);
+ if (cleanupTracker != null) {
+ cleanupAttemptTracker = "http://" + cleanupTracker.getHost() + ":"
+ + cleanupTracker.getHttpPort();
+ }
+ hasCleanupAttempt = true;
+ }
+ out.print("<td>");
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: ");
+ }
if (taskTracker == null) {
- out.print("<td>" + taskTrackerName + "</td>");
+ out.print(taskTrackerName);
} else {
taskAttemptTracker = "http://" + taskTracker.getHost() + ":"
+ taskTracker.getHttpPort();
- out.print("<td><a href=\"" + taskAttemptTracker + "\">"
- + tracker.getNode(taskTracker.getHost()) + "</a></td>");
+ out.print("<a href=\"" + taskAttemptTracker + "\">"
+ + tracker.getNode(taskTracker.getHost()) + "</a>");
+ }
+ if (hasCleanupAttempt) {
+ out.print("<br/>Cleanup Attempt: ");
+ if (cleanupAttemptTracker == null ) {
+ out.print(cleanupTrackerName);
+ } else {
+ out.print("<a href=\"" + cleanupAttemptTracker + "\">"
+ + tracker.getNode(cleanupTracker.getHost()) + "</a>");
}
+ }
+ out.print("</td>");
out.print("<td>" + status.getRunState() + "</td>");
out.print("<td>" + StringUtils.formatPercent(status.getProgress(), 2)
+ ServletUtil.percentageGraph(status.getProgress() * 100f, 80) + "</td>");
@@ -162,6 +195,9 @@
String.valueOf(taskTracker.getHttpPort()),
status.getTaskID().toString());
}
+ if (hasCleanupAttempt) {
+ out.print("Task attempt: <br/>");
+ }
if (taskLogUrl == null) {
out.print("n/a");
} else {
@@ -172,6 +208,25 @@
out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
}
+ if (hasCleanupAttempt) {
+ out.print("Cleanup attempt: <br/>");
+ taskLogUrl = null;
+ if (cleanupTracker != null ) {
+ taskLogUrl = TaskLogServlet.getTaskLogUrl(cleanupTracker.getHost(),
+ String.valueOf(cleanupTracker.getHttpPort()),
+ status.getTaskID().toString());
+ }
+ if (taskLogUrl == null) {
+ out.print("n/a");
+ } else {
+ String tailFourKBUrl = taskLogUrl + "&start=-4097&cleanup=true";
+ String tailEightKBUrl = taskLogUrl + "&start=-8193&cleanup=true";
+ String entireLogUrl = taskLogUrl + "&all=true&cleanup=true";
+ out.print("<a href=\"" + tailFourKBUrl + "\">Last 4KB</a><br/>");
+ out.print("<a href=\"" + tailEightKBUrl + "\">Last 8KB</a><br/>");
+ out.print("<a href=\"" + entireLogUrl + "\">All</a><br/>");
+ }
+ }
out.print("</td><td>" + "<a href=\"/taskstats.jsp?jobid=" + jobid
+ "&tipid=" + tipid + "&taskid=" + status.getTaskID() + "\">"
+ ((status.getCounters() != null) ? status.getCounters().size() : 0) + "</a></td>");