You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by dd...@apache.org on 2009/02/05 18:39:28 UTC
svn commit: r741203 [1/2] - in /hadoop/core/branches/branch-0.20: ./
src/mapred/org/apache/hadoop/mapred/
src/mapred/org/apache/hadoop/mapreduce/lib/output/
src/test/org/apache/hadoop/mapred/ src/webapps/job/
Author: ddas
Date: Thu Feb 5 17:39:27 2009
New Revision: 741203
URL: http://svn.apache.org/viewvc?rev=741203&view=rev
Log:
HADOOP-4759. Removes temporary output directory for failed and killed tasks by launching special CLEANUP tasks for the same. Contributed by Amareshwari Sriramadasu.
Added:
hadoop/core/branches/branch-0.20/src/test/org/apache/hadoop/mapred/TestTaskFail.java
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskStatus.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskTrackerStatus.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskUmbilicalProtocol.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java
hadoop/core/branches/branch-0.20/src/webapps/job/taskdetails.jsp
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Thu Feb 5 17:39:27 2009
@@ -594,6 +594,10 @@
HADOOP-5085. Copying a file to local with Crc throws an exception.
(hairong)
+ HADOOP-4759. Removes temporary output directory for failed and
+ killed tasks by launching special CLEANUP tasks for the same.
+ (Amareshwari Sriramadasu via ddas)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Child.java Thu Feb 5 17:39:27 2009
@@ -47,6 +47,7 @@
LogFactory.getLog(TaskTracker.class);
static volatile TaskAttemptID taskid;
+ static volatile boolean isCleanup;
public static void main(String[] args) throws Throwable {
LOG.debug("Child starting");
@@ -75,7 +76,7 @@
try {
Thread.sleep(5000);
if (taskid != null) {
- TaskLog.syncLogs(firstTaskid, taskid);
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
}
} catch (InterruptedException ie) {
} catch (IOException iee) {
@@ -95,6 +96,7 @@
Path srcPidPath = null;
Path dstPidPath = null;
int idleLoopCount = 0;
+ Task task = null;
try {
while (true) {
JvmTask myTask = umbilical.getTask(jvmId);
@@ -114,22 +116,21 @@
}
}
idleLoopCount = 0;
- Task task = myTask.getTask();
+ task = myTask.getTask();
taskid = task.getTaskID();
-
+ isCleanup = task.isTaskCleanupTask();
//create the index file so that the log files
//are viewable immediately
- TaskLog.syncLogs(firstTaskid, taskid);
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
JobConf job = new JobConf(task.getJobFile());
if (job.getBoolean("task.memory.mgmt.enabled", false)) {
if (srcPidPath == null) {
- srcPidPath = TaskMemoryManagerThread.getPidFilePath(firstTaskid,
- job);
+ srcPidPath = new Path(task.getPidFile());
}
//since the JVM is running multiple tasks potentially, we need
//to do symlink stuff only for the subsequent tasks
if (!taskid.equals(firstTaskid)) {
- dstPidPath = new Path(srcPidPath.getParent(), taskid.toString());
+ dstPidPath = new Path(task.getPidFile());
FileUtil.symLink(srcPidPath.toUri().getPath(),
dstPidPath.toUri().getPath());
}
@@ -154,9 +155,10 @@
try {
task.run(job, umbilical); // run the task
} finally {
- TaskLog.syncLogs(firstTaskid, taskid);
+ TaskLog.syncLogs(firstTaskid, taskid, isCleanup);
if (!taskid.equals(firstTaskid) &&
job.getBoolean("task.memory.mgmt.enabled", false)) {
+ // delete the pid-file's symlink
new File(dstPidPath.toUri().getPath()).delete();
}
}
@@ -169,6 +171,10 @@
umbilical.fsError(taskid, e.getMessage());
} catch (Throwable throwable) {
LOG.warn("Error running child", throwable);
+ if (task != null) {
+ // do cleanup for the task
+ task.taskCleanup(umbilical);
+ }
// Report back any failures, for diagnostic purposes
ByteArrayOutputStream baos = new ByteArrayOutputStream();
throwable.printStackTrace(new PrintStream(baos));
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/FileOutputCommitter.java Thu Feb 5 17:39:27 2009
@@ -132,9 +132,11 @@
public void abortTask(TaskAttemptContext context) {
Path taskOutputPath = getTempTaskOutputPath(context);
try {
- FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
- context.getProgressible().progress();
- fs.delete(taskOutputPath, true);
+ if (taskOutputPath != null) {
+ FileSystem fs = taskOutputPath.getFileSystem(context.getJobConf());
+ context.getProgressible().progress();
+ fs.delete(taskOutputPath, true);
+ }
} catch (IOException ie) {
LOG.warn("Error discarding output" + StringUtils.stringifyException(ie));
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/InterTrackerProtocol.java Thu Feb 5 17:39:27 2009
@@ -59,8 +59,9 @@
* in heartbeat method (HADOOP-4305)
* Version 23: Added parameter 'initialContact' again in heartbeat method
* (HADOOP-4869)
+ * Version 24: Changed format of Task and TaskStatus for HADOOP-4759
*/
- public static final long versionID = 23L;
+ public static final long versionID = 24L;
public final static int TRACKERS_OK = 0;
public final static int UNKNOWN_TASKTRACKER = 1;
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/IsolationRunner.java Thu Feb 5 17:39:27 2009
@@ -180,9 +180,9 @@
FileSystem local = FileSystem.getLocal(conf);
LocalDirAllocator lDirAlloc = new LocalDirAllocator("mapred.local.dir");
File workDirName = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + taskId.getJobID()
- + Path.SEPARATOR + taskId
+ TaskTracker.getLocalTaskDir(
+ taskId.getJobID().toString(),
+ taskId.toString())
+ Path.SEPARATOR + "work",
conf). toString());
local.setWorkingDirectory(new Path(workDirName.toString()));
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobHistory.java Thu Feb 5 17:39:27 2009
@@ -1272,7 +1272,8 @@
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
- String.valueOf(httpPort)});
+ httpPort == -1 ? "" :
+ String.valueOf(httpPort)});
}
}
}
@@ -1468,7 +1469,8 @@
taskAttemptId.getTaskID().toString(),
taskAttemptId.toString(),
String.valueOf(startTime), trackerName,
- String.valueOf(httpPort)});
+ httpPort == -1 ? "" :
+ String.valueOf(httpPort)});
}
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Thu Feb 5 17:39:27 2009
@@ -44,6 +44,7 @@
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
+import org.apache.hadoop.util.StringUtils;
/*************************************************************
* JobInProgress maintains all the info for keeping
@@ -113,6 +114,12 @@
// A set of running reduce TIPs
Set<TaskInProgress> runningReduces;
+
+ // A list of cleanup tasks for the map task attempts, to be launched
+ List<TaskAttemptID> mapCleanupTasks = new LinkedList<TaskAttemptID>();
+
+ // A list of cleanup tasks for the reduce task attempts, to be launched
+ List<TaskAttemptID> reduceCleanupTasks = new LinkedList<TaskAttemptID>();
private final int maxLevel;
@@ -473,12 +480,12 @@
// Just assign splits[0]
cleanup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks);
- cleanup[0].setCleanupTask();
+ cleanup[0].setJobCleanupTask();
// cleanup reduce tip.
cleanup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks, jobtracker, conf, this);
- cleanup[1].setCleanupTask();
+ cleanup[1].setJobCleanupTask();
// create two setup tips, one map and one reduce.
setup = new TaskInProgress[2];
@@ -486,12 +493,12 @@
// Just assign splits[0]
setup[0] = new TaskInProgress(jobId, jobFile, splits[0],
jobtracker, conf, this, numMapTasks + 1 );
- setup[0].setSetupTask();
+ setup[0].setJobSetupTask();
// setup reduce tip.
setup[1] = new TaskInProgress(jobId, jobFile, numMapTasks,
numReduceTasks + 1, jobtracker, conf, this);
- setup[1].setSetupTask();
+ setup[1].setJobSetupTask();
synchronized(jobInitKillStatus){
jobInitKillStatus.initDone = true;
@@ -743,11 +750,27 @@
if (wasComplete && (status.getRunState() == TaskStatus.State.SUCCEEDED)) {
status.setRunState(TaskStatus.State.KILLED);
}
+
+ // If the job is complete and a task has just reported its
+ // state as FAILED_UNCLEAN/KILLED_UNCLEAN,
+ // make the task's state FAILED/KILLED without launching cleanup attempt.
+ // Note that if task is already a cleanup attempt,
+ // we don't change the state to make sure the task gets a killTaskAction
+ if ((this.isComplete() || jobFailed || jobKilled) &&
+ !tip.isCleanupAttempt(taskid)) {
+ if (status.getRunState() == TaskStatus.State.FAILED_UNCLEAN) {
+ status.setRunState(TaskStatus.State.FAILED);
+ } else if (status.getRunState() == TaskStatus.State.KILLED_UNCLEAN) {
+ status.setRunState(TaskStatus.State.KILLED);
+ }
+ }
+
boolean change = tip.updateStatus(status);
if (change) {
TaskStatus.State state = status.getRunState();
+ // get the TaskTrackerStatus where the task ran
TaskTrackerStatus ttStatus =
- this.jobtracker.getTaskTracker(status.getTaskTracker());
+ this.jobtracker.getTaskTracker(tip.machineWhereTaskRan(taskid));
String httpTaskLogLocation = null;
if (null != ttStatus){
@@ -768,8 +791,8 @@
taskid,
tip.idWithinJob(),
status.getIsMap() &&
- !tip.isCleanupTask() &&
- !tip.isSetupTask(),
+ !tip.isJobCleanupTask() &&
+ !tip.isJobSetupTask(),
TaskCompletionEvent.Status.SUCCEEDED,
httpTaskLogLocation
);
@@ -783,6 +806,15 @@
tip.doCommit(taskid);
}
return;
+ } else if (state == TaskStatus.State.FAILED_UNCLEAN ||
+ state == TaskStatus.State.KILLED_UNCLEAN) {
+ tip.incompleteSubTask(taskid, this.status);
+ // add this task, to be rescheduled as cleanup attempt
+ if (tip.isMapTask()) {
+ mapCleanupTasks.add(taskid);
+ } else {
+ reduceCleanupTasks.add(taskid);
+ }
}
//For a failed task update the JT datastructures.
else if (state == TaskStatus.State.FAILED ||
@@ -813,8 +845,8 @@
taskid,
tip.idWithinJob(),
status.getIsMap() &&
- !tip.isCleanupTask() &&
- !tip.isSetupTask(),
+ !tip.isJobCleanupTask() &&
+ !tip.isJobSetupTask(),
taskCompletionStatus,
httpTaskLogLocation
);
@@ -843,7 +875,7 @@
oldProgress + " to " + tip.getProgress());
}
- if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+ if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
double progressDelta = tip.getProgress() - oldProgress;
if (tip.isMapTask()) {
if (maps.length == 0) {
@@ -941,6 +973,40 @@
return result;
}
+ /*
+ * Return task cleanup attempt if any, to run on a given tracker
+ */
+ public synchronized Task obtainTaskCleanupTask(TaskTrackerStatus tts,
+ boolean isMapSlot)
+ throws IOException {
+ if (this.status.getRunState() != JobStatus.RUNNING ||
+ jobFailed || jobKilled) {
+ return null;
+ }
+
+ String taskTracker = tts.getTrackerName();
+ if (!shouldRunOnTaskTracker(taskTracker)) {
+ return null;
+ }
+ TaskAttemptID taskid = null;
+ TaskInProgress tip = null;
+ if (isMapSlot) {
+ if (!mapCleanupTasks.isEmpty()) {
+ taskid = mapCleanupTasks.remove(0);
+ tip = maps[taskid.getTaskID().getId()];
+ }
+ } else {
+ if (!reduceCleanupTasks.isEmpty()) {
+ taskid = reduceCleanupTasks.remove(0);
+ tip = reduces[taskid.getTaskID().getId()];
+ }
+ }
+ if (tip != null) {
+ return tip.addRunningTask(taskid, taskTracker, true);
+ }
+ return null;
+ }
+
public synchronized Task obtainNewLocalMapTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts)
@@ -991,7 +1057,7 @@
* Return a CleanupTask, if appropriate, to run on the given tasktracker
*
*/
- public Task obtainCleanupTask(TaskTrackerStatus tts,
+ public Task obtainJobCleanupTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts,
boolean isMapSlot
@@ -1001,7 +1067,7 @@
}
synchronized(this) {
- if (!canLaunchCleanupTask()) {
+ if (!canLaunchJobCleanupTask()) {
return null;
}
@@ -1042,7 +1108,7 @@
* or all maps and reduces are complete
* @return true/false
*/
- private synchronized boolean canLaunchCleanupTask() {
+ private synchronized boolean canLaunchJobCleanupTask() {
if (!tasksInited.get()) {
return false;
}
@@ -1073,7 +1139,7 @@
* Return a SetupTask, if appropriate, to run on the given tasktracker
*
*/
- public Task obtainSetupTask(TaskTrackerStatus tts,
+ public Task obtainJobSetupTask(TaskTrackerStatus tts,
int clusterSize,
int numUniqueHosts,
boolean isMapSlot
@@ -1197,10 +1263,10 @@
String name;
String splits = "";
Enum counter = null;
- if (tip.isSetupTask()) {
+ if (tip.isJobSetupTask()) {
launchedSetup = true;
name = Values.SETUP.name();
- } else if (tip.isCleanupTask()) {
+ } else if (tip.isJobCleanupTask()) {
launchedCleanup = true;
name = Values.CLEANUP.name();
} else if (tip.isMapTask()) {
@@ -1223,7 +1289,7 @@
JobHistory.Task.logStarted(tip.getTIPId(), name,
tip.getExecStartTime(), splits);
}
- if (!tip.isSetupTask() && !tip.isCleanupTask()) {
+ if (!tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
jobCounters.incrCounter(counter, 1);
}
@@ -1244,7 +1310,7 @@
//
// So to simplify, increment the data locality counter whenever there is
// data locality.
- if (tip.isMapTask() && !tip.isSetupTask() && !tip.isCleanupTask()) {
+ if (tip.isMapTask() && !tip.isJobSetupTask() && !tip.isJobCleanupTask()) {
// increment the data locality counter for maps
Node tracker = jobtracker.getNode(tts.getHost());
int level = this.maxLevel;
@@ -1924,8 +1990,8 @@
TaskTrackerStatus ttStatus =
this.jobtracker.getTaskTracker(status.getTaskTracker());
String trackerHostname = jobtracker.getNode(ttStatus.getHost()).toString();
- String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
- tip.isSetupTask() ? Values.SETUP.name() :
+ String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+ tip.isJobSetupTask() ? Values.SETUP.name() :
tip.isMapTask() ? Values.MAP.name() :
Values.REDUCE.name();
if (status.getIsMap()){
@@ -1955,14 +2021,14 @@
status.getCounters());
int newNumAttempts = tip.getActiveTasks().size();
- if (tip.isSetupTask()) {
+ if (tip.isJobSetupTask()) {
// setup task has finished. kill the extra setup tip
killSetupTip(!tip.isMapTask());
// Job can start running now.
this.status.setSetupProgress(1.0f);
this.status.setRunState(JobStatus.RUNNING);
JobHistory.JobInfo.logStarted(profile.getJobID());
- } else if (tip.isCleanupTask()) {
+ } else if (tip.isJobCleanupTask()) {
// cleanup task has finished. Kill the extra cleanup tip
if (tip.isMapTask()) {
// kill the reduce tip
@@ -2097,6 +2163,8 @@
}
jobKilled = true;
}
+ // clear all unclean tasks
+ clearUncleanTasks();
//
// kill all TIPs.
//
@@ -2111,7 +2179,22 @@
}
}
}
-
+
+ private void clearUncleanTasks() {
+ TaskAttemptID taskid = null;
+ TaskInProgress tip = null;
+ while (!mapCleanupTasks.isEmpty()) {
+ taskid = mapCleanupTasks.remove(0);
+ tip = maps[taskid.getTaskID().getId()];
+ updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+ }
+ while (!reduceCleanupTasks.isEmpty()) {
+ taskid = reduceCleanupTasks.remove(0);
+ tip = reduces[taskid.getTaskID().getId()];
+ updateTaskStatus(tip, tip.getTaskStatus(taskid), null);
+ }
+ }
+
/**
* Kill the job and all its component tasks. This method is called from
* jobtracker and should return fast as it locks the jobtracker.
@@ -2162,16 +2245,16 @@
boolean wasFailed = tip.isFailed();
// Mark the taskid as FAILED or KILLED
- tip.incompleteSubTask(taskid, taskTrackerStatus, this.status);
+ tip.incompleteSubTask(taskid, this.status);
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
//update running count on task failure.
if (wasRunning && !isRunning) {
- if (tip.isCleanupTask()) {
+ if (tip.isJobCleanupTask()) {
launchedCleanup = false;
- } else if (tip.isSetupTask()) {
+ } else if (tip.isJobSetupTask()) {
launchedSetup = false;
} else if (tip.isMapTask()) {
runningMapTasks -= 1;
@@ -2208,43 +2291,48 @@
}
// update job history
- String taskTrackerName = taskTrackerStatus.getHost();
- long finishTime = status.getFinishTime();
- String taskType = tip.isCleanupTask() ? Values.CLEANUP.name() :
- tip.isSetupTask() ? Values.SETUP.name() :
+ // get taskStatus from tip
+ TaskStatus taskStatus = tip.getTaskStatus(taskid);
+ String taskTrackerName = taskStatus.getTaskTracker();
+ String taskTrackerHostName = convertTrackerNameToHostName(taskTrackerName);
+ int taskTrackerPort = -1;
+ if (taskTrackerStatus != null) {
+ taskTrackerPort = taskTrackerStatus.getHttpPort();
+ }
+ long startTime = taskStatus.getStartTime();
+ long finishTime = taskStatus.getFinishTime();
+ List<String> taskDiagnosticInfo = tip.getDiagnosticInfo(taskid);
+ String diagInfo = taskDiagnosticInfo == null ? "" :
+ StringUtils.arrayToString(taskDiagnosticInfo.toArray(new String[0]));
+ String taskType = tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+ tip.isJobSetupTask() ? Values.SETUP.name() :
tip.isMapTask() ? Values.MAP.name() :
Values.REDUCE.name();
- if (status.getIsMap()) {
- JobHistory.MapAttempt.logStarted(status.getTaskID(), status.getStartTime(),
- status.getTaskTracker(), taskTrackerStatus.getHttpPort(),
- taskType);
- if (status.getRunState() == TaskStatus.State.FAILED) {
- JobHistory.MapAttempt.logFailed(status.getTaskID(), finishTime,
- taskTrackerName, status.getDiagnosticInfo(),
- taskType);
+ if (taskStatus.getIsMap()) {
+ JobHistory.MapAttempt.logStarted(taskid, startTime,
+ taskTrackerName, taskTrackerPort, taskType);
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+ JobHistory.MapAttempt.logFailed(taskid, finishTime,
+ taskTrackerHostName, diagInfo, taskType);
} else {
- JobHistory.MapAttempt.logKilled(status.getTaskID(), finishTime,
- taskTrackerName, status.getDiagnosticInfo(),
- taskType);
+ JobHistory.MapAttempt.logKilled(taskid, finishTime,
+ taskTrackerHostName, diagInfo, taskType);
}
} else {
- JobHistory.ReduceAttempt.logStarted(status.getTaskID(), status.getStartTime(),
- status.getTaskTracker(), taskTrackerStatus.getHttpPort(),
- taskType);
- if (status.getRunState() == TaskStatus.State.FAILED) {
- JobHistory.ReduceAttempt.logFailed(status.getTaskID(), finishTime,
- taskTrackerName, status.getDiagnosticInfo(),
- taskType);
+ JobHistory.ReduceAttempt.logStarted(taskid, startTime,
+ taskTrackerName, taskTrackerPort, taskType);
+ if (taskStatus.getRunState() == TaskStatus.State.FAILED) {
+ JobHistory.ReduceAttempt.logFailed(taskid, finishTime,
+ taskTrackerHostName, diagInfo, taskType);
} else {
- JobHistory.ReduceAttempt.logKilled(status.getTaskID(), finishTime,
- taskTrackerName, status.getDiagnosticInfo(),
- taskType);
+ JobHistory.ReduceAttempt.logKilled(taskid, finishTime,
+ taskTrackerHostName, diagInfo, taskType);
}
}
// After this, try to assign tasks with the one after this, so that
// the failed task goes to the end of the list.
- if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+ if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
if (tip.isMapTask()) {
failedMapTasks++;
} else {
@@ -2256,7 +2344,7 @@
// Note down that a task has failed on this tasktracker
//
if (status.getRunState() == TaskStatus.State.FAILED) {
- addTrackerTaskFailure(taskTrackerStatus.getTrackerName());
+ addTrackerTaskFailure(taskTrackerName);
}
//
@@ -2274,7 +2362,7 @@
// Allow upto 'mapFailuresPercent' of map tasks to fail or
// 'reduceFailuresPercent' of reduce tasks to fail
//
- boolean killJob = tip.isCleanupTask() || tip.isSetupTask() ? true :
+ boolean killJob = tip.isJobCleanupTask() || tip.isJobSetupTask() ? true :
tip.isMapTask() ?
((++failedMapTIPs*100) > (mapFailuresPercent*numMapTasks)) :
((++failedReduceTIPs*100) > (reduceFailuresPercent*numReduceTasks));
@@ -2283,9 +2371,9 @@
LOG.info("Aborting job " + profile.getJobID());
JobHistory.Task.logFailed(tip.getTIPId(),
taskType,
- status.getFinishTime(),
- status.getDiagnosticInfo());
- if (tip.isCleanupTask()) {
+ finishTime,
+ diagInfo);
+ if (tip.isJobCleanupTask()) {
// kill the other tip
if (tip.isMapTask()) {
cleanup[1].kill();
@@ -2294,7 +2382,7 @@
}
terminateJob(JobStatus.FAILED);
} else {
- if (tip.isSetupTask()) {
+ if (tip.isJobSetupTask()) {
// kill the other tip
killSetupTip(!tip.isMapTask());
}
@@ -2305,7 +2393,7 @@
//
// Update the counters
//
- if (!tip.isCleanupTask() && !tip.isSetupTask()) {
+ if (!tip.isJobCleanupTask() && !tip.isJobSetupTask()) {
if (tip.isMapTask()) {
jobCounters.incrCounter(Counter.NUM_FAILED_MAPS, 1);
} else {
@@ -2344,8 +2432,8 @@
status.setFinishTime(System.currentTimeMillis());
updateTaskStatus(tip, status, metrics);
JobHistory.Task.logFailed(tip.getTIPId(),
- tip.isCleanupTask() ? Values.CLEANUP.name() :
- tip.isSetupTask() ? Values.SETUP.name() :
+ tip.isJobCleanupTask() ? Values.CLEANUP.name() :
+ tip.isJobSetupTask() ? Values.SETUP.name() :
tip.isMapTask() ? Values.MAP.name() : Values.REDUCE.name(),
tip.getExecFinishTime(), reason, taskid);
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Thu Feb 5 17:39:27 2009
@@ -1654,7 +1654,10 @@
// and TaskInProgress
///////////////////////////////////////////////////////
void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
- LOG.info("Adding task '" + taskid + "' to tip " + tip.getTIPId() + ", for tracker '" + taskTracker + "'");
+ LOG.info("Adding task " +
+ (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") +
+ "'" + taskid + "' to tip " +
+ tip.getTIPId() + ", for tracker '" + taskTracker + "'");
// taskid --> tracker
taskidToTrackerMap.put(taskid, taskTracker);
@@ -1736,6 +1739,8 @@
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+ taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
@@ -1746,6 +1751,8 @@
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
+ taskStatus.getRunState() != TaskStatus.State.FAILED_UNCLEAN &&
+ taskStatus.getRunState() != TaskStatus.State.KILLED_UNCLEAN &&
taskStatus.getRunState() != TaskStatus.State.UNASSIGNED) {
markCompletedTaskAttempt(taskStatus.getTaskTracker(),
taskStatus.getTaskID());
@@ -2559,7 +2566,7 @@
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
- t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+ t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
@@ -2568,7 +2575,15 @@
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
- t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+ t = job.obtainTaskCleanupTask(taskTracker, true);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
numUniqueHosts, true);
if (t != null) {
return Collections.singletonList(t);
@@ -2579,7 +2594,7 @@
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
- t = job.obtainCleanupTask(taskTracker, numTaskTrackers,
+ t = job.obtainJobCleanupTask(taskTracker, numTaskTrackers,
numUniqueHosts, false);
if (t != null) {
return Collections.singletonList(t);
@@ -2588,7 +2603,15 @@
for (Iterator<JobInProgress> it = jobs.values().iterator();
it.hasNext();) {
JobInProgress job = it.next();
- t = job.obtainSetupTask(taskTracker, numTaskTrackers,
+ t = job.obtainTaskCleanupTask(taskTracker, false);
+ if (t != null) {
+ return Collections.singletonList(t);
+ }
+ }
+ for (Iterator<JobInProgress> it = jobs.values().iterator();
+ it.hasNext();) {
+ JobInProgress job = it.next();
+ t = job.obtainJobSetupTask(taskTracker, numTaskTrackers,
numUniqueHosts, false);
if (t != null) {
return Collections.singletonList(t);
@@ -3137,7 +3160,7 @@
// And completed maps with zero reducers of the job
// never need to be failed.
if (!tip.isComplete() ||
- (tip.isMapTask() && !tip.isSetupTask() &&
+ (tip.isMapTask() && !tip.isJobSetupTask() &&
job.desiredReduces() != 0)) {
// if the job is done, we don't want to change anything
if (job.getStatus().getRunState() == JobStatus.RUNNING ||
@@ -3146,7 +3169,10 @@
(tip.isMapTask() ?
TaskStatus.Phase.MAP :
TaskStatus.Phase.REDUCE),
- TaskStatus.State.KILLED, trackerName, myInstrumentation);
+ tip.isRunningTask(taskId) ?
+ TaskStatus.State.KILLED_UNCLEAN :
+ TaskStatus.State.KILLED,
+ trackerName, myInstrumentation);
jobsWithFailures.add(job);
}
} else {
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JvmManager.java Thu Feb 5 17:39:27 2009
@@ -43,8 +43,6 @@
JvmManagerForType reduceJvmManager;
- TaskTracker tracker;
-
public JvmEnv constructJvmEnv(List<String> setup, Vector<String>vargs,
File stdout,File stderr,long logSize, File workDir,
Map<String,String> env, String pidFile, JobConf conf) {
@@ -53,10 +51,9 @@
public JvmManager(TaskTracker tracker) {
mapJvmManager = new JvmManagerForType(tracker.getMaxCurrentMapTasks(),
- true, tracker);
+ true);
reduceJvmManager = new JvmManagerForType(tracker.getMaxCurrentReduceTasks(),
- false, tracker);
- this.tracker = tracker;
+ false);
}
public void stop() {
@@ -74,9 +71,9 @@
public void launchJvm(TaskRunner t, JvmEnv env) {
if (t.getTask().isMapTask()) {
- mapJvmManager.reapJvm(t, tracker, env);
+ mapJvmManager.reapJvm(t, env);
} else {
- reduceJvmManager.reapJvm(t, tracker, env);
+ reduceJvmManager.reapJvm(t, env);
}
}
@@ -125,12 +122,10 @@
boolean isMap;
Random rand = new Random(System.currentTimeMillis());
- TaskTracker tracker;
- public JvmManagerForType(int maxJvms, boolean isMap, TaskTracker tracker) {
+ public JvmManagerForType(int maxJvms, boolean isMap) {
this.maxJvms = maxJvms;
this.isMap = isMap;
- this.tracker = tracker;
}
synchronized public void setRunningTaskForJvm(JVMId jvmId,
@@ -194,7 +189,7 @@
jvmIdToRunner.remove(jvmId);
}
private synchronized void reapJvm(
- TaskRunner t, TaskTracker tracker, JvmEnv env) {
+ TaskRunner t, JvmEnv env) {
if (t.getTaskInProgress().wasKilled()) {
//the task was killed in-flight
//no need to do the rest of the operations
@@ -251,7 +246,7 @@
LOG.info("Killing JVM: " + runnerToKill.jvmId);
runnerToKill.kill();
}
- spawnNewJvm(jobId, env, tracker, t);
+ spawnNewJvm(jobId, env, t);
return;
}
//*MUST* never reach this
@@ -281,7 +276,7 @@
return details.toString();
}
- private void spawnNewJvm(JobID jobId, JvmEnv env, TaskTracker tracker,
+ private void spawnNewJvm(JobID jobId, JvmEnv env,
TaskRunner t) {
JvmRunner jvmRunner = new JvmRunner(env,jobId);
jvmIdToRunner.put(jvmRunner.jvmId, jvmRunner);
@@ -293,11 +288,6 @@
//tasks. Doing it this way also keeps code simple.
jvmRunner.setDaemon(true);
jvmRunner.setName("JVM Runner " + jvmRunner.jvmId + " spawned.");
- if (tracker.isTaskMemoryManagerEnabled()) {
- tracker.getTaskMemoryManager().addTask(
- TaskAttemptID.forName(env.conf.get("mapred.task.id")),
- tracker.getVirtualMemoryForTask(env.conf));
- }
setRunningTaskForJvm(jvmRunner.jvmId, t);
LOG.info(jvmRunner.getName());
jvmRunner.start();
@@ -355,6 +345,7 @@
LOG.info("JVM : " + jvmId +" exited. Number of tasks it ran: " +
numTasksRan);
try {
+ // In case of jvm-reuse,
//the task jvm cleans up the common workdir for every
//task at the beginning of each task in the task JVM.
//For the last task, we do it here.
@@ -362,12 +353,6 @@
FileUtil.fullyDelete(env.workDir);
}
} catch (IOException ie){}
- if (tracker.isTaskMemoryManagerEnabled()) {
- // Remove the associated pid-file, if any
- tracker.getTaskMemoryManager().
- removePidFile(TaskAttemptID.forName(
- env.conf.get("mapred.task.id")));
- }
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapOutputFile.java Thu Feb 5 17:39:27 2009
@@ -30,13 +30,13 @@
class MapOutputFile {
private JobConf conf;
- private String jobDir;
+ private JobID jobId;
MapOutputFile() {
}
MapOutputFile(JobID jobId) {
- this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+ this.jobId = jobId;
}
private LocalDirAllocator lDirAlloc =
@@ -47,9 +47,9 @@
*/
public Path getOutputFile(TaskAttemptID mapTaskId)
throws IOException {
- return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/file.out", conf);
+ return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/file.out", conf);
}
/** Create a local map output file name.
@@ -58,9 +58,9 @@
*/
public Path getOutputFileForWrite(TaskAttemptID mapTaskId, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/file.out", size, conf);
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/file.out", size, conf);
}
/** Return the path to a local map output index file created earlier
@@ -68,9 +68,9 @@
*/
public Path getOutputIndexFile(TaskAttemptID mapTaskId)
throws IOException {
- return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/file.out.index", conf);
+ return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/file.out.index", conf);
}
/** Create a local map output index file name.
@@ -79,10 +79,10 @@
*/
public Path getOutputIndexFileForWrite(TaskAttemptID mapTaskId, long size)
throws IOException {
- return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/file.out.index",
- size, conf);
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/file.out.index",
+ size, conf);
}
/** Return a local map spill file created earlier.
@@ -91,10 +91,10 @@
*/
public Path getSpillFile(TaskAttemptID mapTaskId, int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/spill"
- + spillNumber + ".out", conf);
+ return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/spill"
+ + spillNumber + ".out", conf);
}
/** Create a local map spill file name.
@@ -104,10 +104,10 @@
*/
public Path getSpillFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/spill" +
- spillNumber + ".out", size, conf);
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/spill" +
+ spillNumber + ".out", size, conf);
}
/** Return a local map spill index file created earlier
@@ -116,10 +116,10 @@
*/
public Path getSpillIndexFile(TaskAttemptID mapTaskId, int spillNumber)
throws IOException {
- return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/spill" +
- spillNumber + ".out.index", conf);
+ return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/spill" +
+ spillNumber + ".out.index", conf);
}
/** Create a local map spill index file name.
@@ -129,10 +129,10 @@
*/
public Path getSpillIndexFileForWrite(TaskAttemptID mapTaskId, int spillNumber,
long size) throws IOException {
- return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
- mapTaskId + Path.SEPARATOR +
- "output" + "/spill" + spillNumber +
- ".out.index", size, conf);
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), mapTaskId.toString())
+ + "/spill" + spillNumber +
+ ".out.index", size, conf);
}
/** Return a local reduce input file created earlier
@@ -142,10 +142,10 @@
public Path getInputFile(int mapId, TaskAttemptID reduceTaskId)
throws IOException {
// TODO *oom* should use a format here
- return lDirAlloc.getLocalPathToRead(jobDir + Path.SEPARATOR +
- reduceTaskId + Path.SEPARATOR +
- "output" + "/map_" + mapId + ".out",
- conf);
+ return lDirAlloc.getLocalPathToRead(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), reduceTaskId.toString())
+ + "/map_" + mapId + ".out",
+ conf);
}
/** Create a local reduce input file name.
@@ -157,17 +157,17 @@
long size)
throws IOException {
// TODO *oom* should use a format here
- return lDirAlloc.getLocalPathForWrite(jobDir + Path.SEPARATOR +
- reduceTaskId + Path.SEPARATOR +
- ("output" + "/map_" + mapId.getId() +
- ".out"),
- size, conf);
+ return lDirAlloc.getLocalPathForWrite(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), reduceTaskId.toString())
+ + "/map_" + mapId.getId() + ".out",
+ size, conf);
}
/** Removes all of the files related to a task. */
public void removeAll(TaskAttemptID taskId) throws IOException {
- conf.deleteLocalFiles(jobDir + Path.SEPARATOR +
- taskId + Path.SEPARATOR + "output");
+ conf.deleteLocalFiles(TaskTracker.getIntermediateOutputDir(
+ jobId.toString(), taskId.toString())
+);
}
public void setConf(Configuration conf) {
@@ -179,7 +179,7 @@
}
public void setJobId(JobID jobId) {
- this.jobDir = TaskTracker.getJobCacheSubdir() + Path.SEPARATOR + jobId;
+ this.jobId = jobId;
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/MapTask.java Thu Feb 5 17:39:27 2009
@@ -103,13 +103,15 @@
@Override
public void localizeConfiguration(JobConf conf) throws IOException {
super.localizeConfiguration(conf);
- Path localSplit = new Path(new Path(getJobFile()).getParent(),
- "split.dta");
- LOG.debug("Writing local split to " + localSplit);
- DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
- Text.writeString(out, splitClass);
- split.write(out);
- out.close();
+ if (isMapOrReduce()) {
+ Path localSplit = new Path(new Path(getJobFile()).getParent(),
+ "split.dta");
+ LOG.debug("Writing local split to " + localSplit);
+ DataOutputStream out = FileSystem.getLocal(conf).create(localSplit);
+ Text.writeString(out, splitClass);
+ split.write(out);
+ out.close();
+ }
}
@Override
@@ -121,16 +123,20 @@
@Override
public void write(DataOutput out) throws IOException {
super.write(out);
- Text.writeString(out, splitClass);
- split.write(out);
- split = null;
+ if (isMapOrReduce()) {
+ Text.writeString(out, splitClass);
+ split.write(out);
+ split = null;
+ }
}
@Override
public void readFields(DataInput in) throws IOException {
super.readFields(in);
- splitClass = Text.readString(in);
- split.readFields(in);
+ if (isMapOrReduce()) {
+ splitClass = Text.readString(in);
+ split.readFields(in);
+ }
}
/**
@@ -280,12 +286,16 @@
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
- if (cleanupJob) {
- runCleanup(umbilical, reporter);
+ if (jobCleanup) {
+ runJobCleanupTask(umbilical, reporter);
+ return;
+ }
+ if (jobSetup) {
+ runJobSetupTask(umbilical, reporter);
return;
}
- if (setupJob) {
- runSetupJob(umbilical, reporter);
+ if (taskCleanup) {
+ runTaskCleanupTask(umbilical, reporter);
return;
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/ReduceTask.java Thu Feb 5 17:39:27 2009
@@ -343,7 +343,7 @@
throws IOException, InterruptedException, ClassNotFoundException {
job.setBoolean("mapred.skip.on", isSkipping());
- if (!cleanupJob && !setupJob) {
+ if (isMapOrReduce()) {
copyPhase = getProgress().addPhase("copy");
sortPhase = getProgress().addPhase("sort");
reducePhase = getProgress().addPhase("reduce");
@@ -355,12 +355,16 @@
initialize(job, getJobID(), reporter, useNewApi);
// check if it is a cleanupJobTask
- if (cleanupJob) {
- runCleanup(umbilical, reporter);
+ if (jobCleanup) {
+ runJobCleanupTask(umbilical, reporter);
return;
}
- if (setupJob) {
- runSetupJob(umbilical, reporter);
+ if (jobSetup) {
+ runJobSetupTask(umbilical, reporter);
+ return;
+ }
+ if (taskCleanup) {
+ runTaskCleanupTask(umbilical, reporter);
return;
}
@@ -383,6 +387,7 @@
}
copyPhase.complete(); // copy is already complete
setPhase(TaskStatus.Phase.SORT);
+ statusUpdate(umbilical);
final FileSystem rfs = FileSystem.getLocal(job).getRaw();
RawKeyValueIterator rIter = isLocal
@@ -398,6 +403,7 @@
sortPhase.complete(); // sort is complete
setPhase(TaskStatus.Phase.REDUCE);
+ statusUpdate(umbilical);
Class keyClass = job.getMapOutputKeyClass();
Class valueClass = job.getMapOutputValueClass();
RawComparator comparator = job.getOutputValueGroupingComparator();
@@ -1243,10 +1249,10 @@
// else, we will check the localFS to find a suitable final location
// for this path
TaskAttemptID reduceId = reduceTask.getTaskID();
- Path filename = new Path("/" + TaskTracker.getJobCacheSubdir() +
- Path.SEPARATOR + getTaskID().getJobID() +
- Path.SEPARATOR + reduceId +
- Path.SEPARATOR + "output" + "/map_" +
+ Path filename = new Path("/" + TaskTracker.getIntermediateOutputDir(
+ reduceId.getJobID().toString(),
+ reduceId.toString())
+ + "/map_" +
loc.getTaskId().getId() + ".out");
// Copy the map output to a temp file whose name is unique to this attempt
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/Task.java Thu Feb 5 17:39:27 2009
@@ -110,8 +110,9 @@
private TaskAttemptID taskId; // unique, includes job id
private int partition; // id within job
TaskStatus taskStatus; // current status of the task
- protected boolean cleanupJob = false;
- protected boolean setupJob = false;
+ protected boolean jobCleanup = false;
+ protected boolean jobSetup = false;
+ protected boolean taskCleanup = false;
//skip ranges based on failed ranges from previous attempts
private SortedRanges skipRanges = new SortedRanges();
@@ -131,8 +132,8 @@
protected TaskAttemptContext taskContext;
protected org.apache.hadoop.mapreduce.OutputFormat<?,?> outputFormat;
protected org.apache.hadoop.mapreduce.OutputCommitter committer;
- private volatile boolean commitPending = false;
protected final Counters.Counter spilledRecordsCounter;
+ private String pidFile = "";
////////////////////////////////////////////
// Constructors
@@ -168,6 +169,12 @@
public String getJobFile() { return jobFile; }
public TaskAttemptID getTaskID() { return taskId; }
Counters getCounters() { return counters; }
+ public void setPidFile(String pidFile) {
+ this.pidFile = pidFile;
+ }
+ public String getPidFile() {
+ return pidFile;
+ }
/**
* Get the job name for this task.
@@ -244,15 +251,50 @@
}
/**
- * Sets whether the task is cleanup task
+ * Return current state of the task.
+ * needs to be synchronized as communication thread
+ * sends the state every second
+ * @return
+ */
+ synchronized TaskStatus.State getState(){
+ return this.taskStatus.getRunState();
+ }
+ /**
+ * Set current state of the task.
+ * @param state
*/
- public void setCleanupTask() {
- cleanupJob = true;
+ synchronized void setState(TaskStatus.State state){
+ this.taskStatus.setRunState(state);
}
- public void setSetupTask() {
- setupJob = true;
+ void setTaskCleanupTask() {
+ taskCleanup = true;
}
+
+ boolean isTaskCleanupTask() {
+ return taskCleanup;
+ }
+
+ boolean isJobCleanupTask() {
+ return jobCleanup;
+ }
+
+ boolean isJobSetupTask() {
+ return jobSetup;
+ }
+
+ void setJobSetupTask() {
+ jobSetup = true;
+ }
+
+ void setJobCleanupTask() {
+ jobCleanup = true;
+ }
+
+ boolean isMapOrReduce() {
+ return !jobSetup && !jobCleanup && !taskCleanup;
+ }
+
////////////////////////////////////////////
// Writable methods
////////////////////////////////////////////
@@ -264,10 +306,13 @@
taskStatus.write(out);
skipRanges.write(out);
out.writeBoolean(skipping);
- out.writeBoolean(cleanupJob);
- out.writeBoolean(setupJob);
+ out.writeBoolean(jobCleanup);
+ out.writeBoolean(jobSetup);
out.writeBoolean(writeSkipRecs);
+ out.writeBoolean(taskCleanup);
+ Text.writeString(out, pidFile);
}
+
public void readFields(DataInput in) throws IOException {
jobFile = Text.readString(in);
taskId = TaskAttemptID.read(in);
@@ -278,9 +323,14 @@
currentRecIndexIterator = skipRanges.skipRangeIterator();
currentRecStartIndex = currentRecIndexIterator.next();
skipping = in.readBoolean();
- cleanupJob = in.readBoolean();
- setupJob = in.readBoolean();
+ jobCleanup = in.readBoolean();
+ jobSetup = in.readBoolean();
writeSkipRecs = in.readBoolean();
+ taskCleanup = in.readBoolean();
+ if (taskCleanup) {
+ setPhase(TaskStatus.Phase.CLEANUP);
+ }
+ pidFile = Text.readString(in);
}
@Override
@@ -332,6 +382,9 @@
InterruptedException {
jobContext = new JobContext(job, id, reporter);
taskContext = new TaskAttemptContext(job, taskId, reporter);
+ if (getState() == TaskStatus.State.UNASSIGNED) {
+ setState(TaskStatus.State.RUNNING);
+ }
if (useNewApi) {
LOG.debug("using new api for output committer");
outputFormat =
@@ -461,17 +514,10 @@
if (sendProgress) {
// we need to send progress update
updateCounters();
- if (commitPending) {
- taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
- taskProgress.get(),
- taskProgress.toString(),
- counters);
- } else {
- taskStatus.statusUpdate(TaskStatus.State.RUNNING,
- taskProgress.get(),
- taskProgress.toString(),
- counters);
- }
+ taskStatus.statusUpdate(getState(),
+ taskProgress.get(),
+ taskProgress.toString(),
+ counters);
taskFound = umbilical.statusUpdate(taskId, taskStatus);
taskStatus.clearStatus();
}
@@ -604,8 +650,7 @@
boolean commitRequired = committer.needsTaskCommit(taskContext);
if (commitRequired) {
int retries = MAX_RETRIES;
- taskStatus.setRunState(TaskStatus.State.COMMIT_PENDING);
- commitPending = true;
+ setState(TaskStatus.State.COMMIT_PENDING);
// say the task tracker that task is commit pending
while (true) {
try {
@@ -631,37 +676,21 @@
sendDone(umbilical);
}
- private void sendLastUpdate(TaskUmbilicalProtocol umbilical)
+ protected void statusUpdate(TaskUmbilicalProtocol umbilical)
throws IOException {
- //first wait for the COMMIT approval from the tasktracker
int retries = MAX_RETRIES;
while (true) {
try {
- // send a final status report
- if (commitPending) {
- taskStatus.statusUpdate(TaskStatus.State.COMMIT_PENDING,
- taskProgress.get(),
- taskProgress.toString(),
- counters);
- } else {
- taskStatus.statusUpdate(TaskStatus.State.RUNNING,
- taskProgress.get(),
- taskProgress.toString(),
- counters);
- }
-
- try {
- if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
- LOG.warn("Parent died. Exiting "+taskId);
- System.exit(66);
- }
- taskStatus.clearStatus();
- return;
- } catch (InterruptedException ie) {
- Thread.currentThread().interrupt(); // interrupt ourself
+ if (!umbilical.statusUpdate(getTaskID(), taskStatus)) {
+ LOG.warn("Parent died. Exiting "+taskId);
+ System.exit(66);
}
+ taskStatus.clearStatus();
+ return;
+ } catch (InterruptedException ie) {
+ Thread.currentThread().interrupt(); // interrupt ourself
} catch (IOException ie) {
- LOG.warn("Failure sending last status update: " +
+ LOG.warn("Failure sending status update: " +
StringUtils.stringifyException(ie));
if (--retries == 0) {
throw ie;
@@ -669,6 +698,16 @@
}
}
}
+
+ private void sendLastUpdate(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ // send a final status report
+ taskStatus.statusUpdate(getState(),
+ taskProgress.get(),
+ taskProgress.toString(),
+ counters);
+ statusUpdate(umbilical);
+ }
private void sendDone(TaskUmbilicalProtocol umbilical) throws IOException {
int retries = MAX_RETRIES;
@@ -735,18 +774,37 @@
}
}
- protected void runCleanup(TaskUmbilicalProtocol umbilical,
- TaskReporter reporter
- ) throws IOException, InterruptedException {
+ protected void runTaskCleanupTask(TaskUmbilicalProtocol umbilical,
+ TaskReporter reporter)
+ throws IOException, InterruptedException {
+ taskCleanup(umbilical);
+ done(umbilical, reporter);
+ }
+
+ void taskCleanup(TaskUmbilicalProtocol umbilical)
+ throws IOException {
+ // set phase for this task
+ setPhase(TaskStatus.Phase.CLEANUP);
+ getProgress().setStatus("cleanup");
+ statusUpdate(umbilical);
+ LOG.info("Runnning cleanup for the task");
+ // do the cleanup
+ discardOutput(taskContext);
+ }
+
+ protected void runJobCleanupTask(TaskUmbilicalProtocol umbilical,
+ TaskReporter reporter
+ ) throws IOException, InterruptedException {
// set phase for this task
setPhase(TaskStatus.Phase.CLEANUP);
getProgress().setStatus("cleanup");
+ statusUpdate(umbilical);
// do the cleanup
committer.cleanupJob(jobContext);
done(umbilical, reporter);
}
- protected void runSetupJob(TaskUmbilicalProtocol umbilical,
+ protected void runJobSetupTask(TaskUmbilicalProtocol umbilical,
TaskReporter reporter
) throws IOException, InterruptedException {
// do the setup
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Thu Feb 5 17:39:27 2009
@@ -82,8 +82,8 @@
private long maxSkipRecords = 0;
private FailedRanges failedRanges = new FailedRanges();
private volatile boolean skipping = false;
- private boolean cleanup = false;
- private boolean setup = false;
+ private boolean jobCleanup = false;
+ private boolean jobSetup = false;
// The 'next' usable taskid of this tip
int nextTaskId = 0;
@@ -103,8 +103,10 @@
private TreeMap<TaskAttemptID,TaskStatus> taskStatuses =
new TreeMap<TaskAttemptID,TaskStatus>();
- // Map from taskId -> Task
- private Map<TaskAttemptID, Task> tasks = new TreeMap<TaskAttemptID, Task>();
+ // Map from taskId -> TaskTracker Id,
+ // contains cleanup attempts and where they ran, if any
+ private TreeMap<TaskAttemptID, String> cleanupTasks =
+ new TreeMap<TaskAttemptID, String>();
private TreeSet<String> machinesWhereFailed = new TreeSet<String>();
private TreeSet<TaskAttemptID> tasksReportedClosed = new TreeSet<TaskAttemptID>();
@@ -174,20 +176,20 @@
return partition;
}
- public boolean isCleanupTask() {
- return cleanup;
+ public boolean isJobCleanupTask() {
+ return jobCleanup;
}
- public void setCleanupTask() {
- cleanup = true;
+ public void setJobCleanupTask() {
+ jobCleanup = true;
}
- public boolean isSetupTask() {
- return setup;
+ public boolean isJobSetupTask() {
+ return jobSetup;
}
- public void setSetupTask() {
- setup = true;
+ public void setJobSetupTask() {
+ jobSetup = true;
}
public boolean isOnlyCommitPending() {
@@ -275,15 +277,6 @@
}
/**
- * Return the Task object associated with a taskId
- * @param taskId
- * @return
- */
- public Task getTask(TaskAttemptID taskId) {
- return tasks.get(taskId);
- }
-
- /**
* Is the Task associated with taskid is the first attempt of the tip?
* @param taskId
* @return Returns true if the Task is the first attempt of the tip
@@ -392,7 +385,8 @@
tasksReportedClosed.add(taskid);
close = true;
} else if (isComplete() &&
- !(isMapTask() && !setup && !cleanup && isComplete(taskid)) &&
+ !(isMapTask() && !jobSetup &&
+ !jobCleanup && isComplete(taskid)) &&
!tasksReportedClosed.contains(taskid)) {
tasksReportedClosed.add(taskid);
close = true;
@@ -516,6 +510,8 @@
// @see {@link TaskTracker.transmitHeartbeat()}
if ((newState != TaskStatus.State.RUNNING &&
newState != TaskStatus.State.COMMIT_PENDING &&
+ newState != TaskStatus.State.FAILED_UNCLEAN &&
+ newState != TaskStatus.State.KILLED_UNCLEAN &&
newState != TaskStatus.State.UNASSIGNED) &&
(oldState == newState)) {
LOG.warn("Recieved duplicate status update of '" + newState +
@@ -531,6 +527,8 @@
newState == TaskStatus.State.UNASSIGNED) &&
(oldState == TaskStatus.State.FAILED ||
oldState == TaskStatus.State.KILLED ||
+ oldState == TaskStatus.State.FAILED_UNCLEAN ||
+ oldState == TaskStatus.State.KILLED_UNCLEAN ||
oldState == TaskStatus.State.SUCCEEDED ||
oldState == TaskStatus.State.COMMIT_PENDING)) {
return false;
@@ -538,8 +536,17 @@
changed = oldState != newState;
}
-
- taskStatuses.put(taskid, status);
+ // if task is a cleanup attempt, do not replace the complete status,
+ // update only specific fields.
+ // For example, startTime should not be updated,
+ // but finishTime has to be updated.
+ if (!isCleanupAttempt(taskid)) {
+ taskStatuses.put(taskid, status);
+ } else {
+ taskStatuses.get(taskid).statusUpdate(status.getRunState(),
+ status.getProgress(), status.getStateString(), status.getPhase(),
+ status.getFinishTime());
+ }
// Recompute progress
recomputeProgress();
@@ -551,29 +558,38 @@
* has failed.
*/
public void incompleteSubTask(TaskAttemptID taskid,
- TaskTrackerStatus ttStatus,
JobStatus jobStatus) {
//
// Note the failure and its location
//
- String trackerName = ttStatus.getTrackerName();
- String trackerHostName = ttStatus.getHost();
-
TaskStatus status = taskStatuses.get(taskid);
+ String trackerName;
+ String trackerHostName = null;
TaskStatus.State taskState = TaskStatus.State.FAILED;
if (status != null) {
+ trackerName = status.getTaskTracker();
+ trackerHostName =
+ JobInProgress.convertTrackerNameToHostName(trackerName);
// Check if the user manually KILLED/FAILED this task-attempt...
Boolean shouldFail = tasksToKill.remove(taskid);
if (shouldFail != null) {
- taskState = (shouldFail) ? TaskStatus.State.FAILED :
- TaskStatus.State.KILLED;
+ if (isCleanupAttempt(taskid)) {
+ taskState = (shouldFail) ? TaskStatus.State.FAILED :
+ TaskStatus.State.KILLED;
+ } else {
+ taskState = (shouldFail) ? TaskStatus.State.FAILED_UNCLEAN :
+ TaskStatus.State.KILLED_UNCLEAN;
+
+ }
status.setRunState(taskState);
addDiagnosticInfo(taskid, "Task has been " + taskState + " by the user" );
}
taskState = status.getRunState();
if (taskState != TaskStatus.State.FAILED &&
- taskState != TaskStatus.State.KILLED) {
+ taskState != TaskStatus.State.KILLED &&
+ taskState != TaskStatus.State.FAILED_UNCLEAN &&
+ taskState != TaskStatus.State.KILLED_UNCLEAN) {
LOG.info("Task '" + taskid + "' running on '" + trackerName +
"' in state: '" + taskState + "' being failed!");
status.setRunState(TaskStatus.State.FAILED);
@@ -594,7 +610,7 @@
// should note this failure only for completed maps, only if this taskid;
// completed this map. however if the job is done, there is no need to
// manipulate completed maps
- if (this.isMapTask() && !setup && !cleanup && isComplete(taskid) &&
+ if (this.isMapTask() && !jobSetup && !jobCleanup && isComplete(taskid) &&
jobStatus.getRunState() != JobStatus.SUCCEEDED) {
this.completes--;
@@ -614,7 +630,7 @@
skipping = startSkipping();
}
- } else {
+ } else if (taskState == TaskStatus.State.KILLED) {
numKilledTasks++;
}
}
@@ -741,6 +757,7 @@
TaskStatus st = taskStatuses.get(taskId);
if(st != null && (st.getRunState() == TaskStatus.State.RUNNING
|| st.getRunState() == TaskStatus.State.COMMIT_PENDING ||
+ st.inTaskCleanupPhase() ||
st.getRunState() == TaskStatus.State.UNASSIGNED)
&& tasksToKill.put(taskId, shouldFail) == null ) {
String logStr = "Request received to " + (shouldFail ? "fail" : "kill")
@@ -865,11 +882,17 @@
return addRunningTask(taskid, taskTracker);
}
+ public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+ return addRunningTask(taskid, taskTracker, false);
+ }
+
/**
* Adds a previously running task to this tip. This is used in case of
* jobtracker restarts.
*/
- public Task addRunningTask(TaskAttemptID taskid, String taskTracker) {
+ public Task addRunningTask(TaskAttemptID taskid,
+ String taskTracker,
+ boolean taskCleanup) {
// create the task
Task t = null;
if (isMapTask()) {
@@ -880,11 +903,17 @@
} else {
t = new ReduceTask(jobFile, taskid, partition, numMaps);
}
- if (cleanup) {
- t.setCleanupTask();
+ if (jobCleanup) {
+ t.setJobCleanupTask();
+ }
+ if (jobSetup) {
+ t.setJobSetupTask();
}
- if (setup) {
- t.setSetupTask();
+ if (taskCleanup) {
+ t.setTaskCleanupTask();
+ t.setState(taskStatuses.get(taskid).getRunState());
+ cleanupTasks.put(taskid, taskTracker);
+ jobtracker.removeTaskEntry(taskid);
}
t.setConf(conf);
LOG.debug("Launching task with skipRanges:"+failedRanges.getSkipRanges());
@@ -893,7 +922,6 @@
if(failedRanges.isTestAttempt()) {
t.setWriteSkipRecs(false);
}
- tasks.put(taskid, t);
activeTasks.put(taskid, taskTracker);
@@ -901,6 +929,23 @@
jobtracker.createTaskEntry(taskid, taskTracker, this);
return t;
}
+
+ boolean isRunningTask(TaskAttemptID taskid) {
+ TaskStatus status = taskStatuses.get(taskid);
+ return status != null && status.getRunState() == TaskStatus.State.RUNNING;
+ }
+
+ boolean isCleanupAttempt(TaskAttemptID taskid) {
+ return cleanupTasks.containsKey(taskid);
+ }
+
+ String machineWhereCleanupRan(TaskAttemptID taskid) {
+ return cleanupTasks.get(taskid);
+ }
+
+ String machineWhereTaskRan(TaskAttemptID taskid) {
+ return taskStatuses.get(taskid).getTaskTracker();
+ }
/**
* Has this task already failed on this machine?
@@ -987,7 +1032,7 @@
}
public long getMapInputSize() {
- if(isMapTask() && !setup && !cleanup) {
+ if(isMapTask() && !jobSetup && !jobCleanup) {
return rawSplit.getDataLength();
} else {
return 0;
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLog.java Thu Feb 5 17:39:27 2009
@@ -80,7 +80,14 @@
private static LogFileDetail getTaskLogFileDetail(TaskAttemptID taskid,
LogName filter) throws IOException {
- File indexFile = new File(getBaseDir(taskid.toString()), "log.index");
+ return getLogFileDetail(taskid, filter, false);
+ }
+
+ private static LogFileDetail getLogFileDetail(TaskAttemptID taskid,
+ LogName filter,
+ boolean isCleanup)
+ throws IOException {
+ File indexFile = getIndexFile(taskid.toString(), isCleanup);
BufferedReader fis = new BufferedReader(new java.io.FileReader(indexFile));
//the format of the index file is
//LOG_DIR: <the dir where the task logs are really stored>
@@ -120,8 +127,17 @@
}
public static File getIndexFile(String taskid) {
- return new File(getBaseDir(taskid), "log.index");
+ return getIndexFile(taskid, false);
+ }
+
+ public static File getIndexFile(String taskid, boolean isCleanup) {
+ if (isCleanup) {
+ return new File(getBaseDir(taskid), "log.index.cleanup");
+ } else {
+ return new File(getBaseDir(taskid), "log.index");
+ }
}
+
private static File getBaseDir(String taskid) {
return new File(LOG_DIR, taskid);
}
@@ -129,9 +145,10 @@
private static long prevErrLength;
private static long prevLogLength;
- private static void writeToIndexFile(TaskAttemptID firstTaskid)
+ private static void writeToIndexFile(TaskAttemptID firstTaskid,
+ boolean isCleanup)
throws IOException {
- File indexFile = getIndexFile(currentTaskid.toString());
+ File indexFile = getIndexFile(currentTaskid.toString(), isCleanup);
BufferedOutputStream bos =
new BufferedOutputStream(new FileOutputStream(indexFile,false));
DataOutputStream dos = new DataOutputStream(bos);
@@ -159,8 +176,17 @@
prevLogLength = getTaskLogFile(firstTaskid, LogName.SYSLOG).length();
}
private volatile static TaskAttemptID currentTaskid = null;
+
+ public synchronized static void syncLogs(TaskAttemptID firstTaskid,
+ TaskAttemptID taskid)
+ throws IOException {
+ syncLogs(firstTaskid, taskid, false);
+ }
+
@SuppressWarnings("unchecked")
- public synchronized static void syncLogs(TaskAttemptID firstTaskid, TaskAttemptID taskid)
+ public synchronized static void syncLogs(TaskAttemptID firstTaskid,
+ TaskAttemptID taskid,
+ boolean isCleanup)
throws IOException {
System.out.flush();
System.err.flush();
@@ -179,10 +205,9 @@
currentTaskid = taskid;
resetPrevLengths(firstTaskid);
}
- writeToIndexFile(firstTaskid);
+ writeToIndexFile(firstTaskid, isCleanup);
}
-
/**
* The filter for userlogs.
*/
@@ -249,6 +274,12 @@
static class Reader extends InputStream {
private long bytesRemaining;
private FileInputStream file;
+
+ public Reader(TaskAttemptID taskid, LogName kind,
+ long start, long end) throws IOException {
+ this(taskid, kind, start, end, false);
+ }
+
/**
* Read a log file from start to end positions. The offsets may be negative,
* in which case they are relative to the end of the file. For example,
@@ -258,12 +289,13 @@
* @param kind the kind of log to read
* @param start the offset to read from (negative is relative to tail)
* @param end the offset to read upto (negative is relative to tail)
+ * @param isCleanup whether the attempt is cleanup attempt or not
* @throws IOException
*/
public Reader(TaskAttemptID taskid, LogName kind,
- long start, long end) throws IOException {
+ long start, long end, boolean isCleanup) throws IOException {
// find the right log file
- LogFileDetail fileDetail = getTaskLogFileDetail(taskid, kind);
+ LogFileDetail fileDetail = getLogFileDetail(taskid, kind, isCleanup);
// calculate the start and stop
long size = fileDetail.length;
if (start < 0) {
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskLogServlet.java Thu Feb 5 17:39:27 2009
@@ -104,7 +104,8 @@
private void printTaskLog(HttpServletResponse response,
OutputStream out, TaskAttemptID taskId,
long start, long end, boolean plainText,
- TaskLog.LogName filter) throws IOException {
+ TaskLog.LogName filter, boolean isCleanup)
+ throws IOException {
if (!plainText) {
out.write(("<br><b><u>" + filter + " logs</u></b><br>\n" +
"<pre>\n").getBytes());
@@ -112,7 +113,7 @@
try {
InputStream taskLogReader =
- new TaskLog.Reader(taskId, filter, start, end);
+ new TaskLog.Reader(taskId, filter, start, end, isCleanup);
byte[] b = new byte[65536];
int result;
while (true) {
@@ -159,6 +160,7 @@
long end = -1;
boolean plainText = false;
TaskLog.LogName filter = null;
+ boolean isCleanup = false;
String taskIdStr = request.getParameter("taskid");
if (taskIdStr == null) {
@@ -193,7 +195,12 @@
if (sPlainText != null) {
plainText = Boolean.valueOf(sPlainText);
}
-
+
+ String sCleanup = request.getParameter("cleanup");
+ if (sCleanup != null) {
+ isCleanup = Boolean.valueOf(sCleanup);
+ }
+
OutputStream out = response.getOutputStream();
if( !plainText ) {
out.write(("<html>\n" +
@@ -203,21 +210,22 @@
if (filter == null) {
printTaskLog(response, out, taskId, start, end, plainText,
- TaskLog.LogName.STDOUT);
+ TaskLog.LogName.STDOUT, isCleanup);
printTaskLog(response, out, taskId, start, end, plainText,
- TaskLog.LogName.STDERR);
+ TaskLog.LogName.STDERR, isCleanup);
printTaskLog(response, out, taskId, start, end, plainText,
- TaskLog.LogName.SYSLOG);
+ TaskLog.LogName.SYSLOG, isCleanup);
if (haveTaskLog(taskId, TaskLog.LogName.DEBUGOUT)) {
printTaskLog(response, out, taskId, start, end, plainText,
- TaskLog.LogName.DEBUGOUT);
+ TaskLog.LogName.DEBUGOUT, isCleanup);
}
if (haveTaskLog(taskId, TaskLog.LogName.PROFILE)) {
printTaskLog(response, out, taskId, start, end, plainText,
- TaskLog.LogName.PROFILE);
+ TaskLog.LogName.PROFILE, isCleanup);
}
} else {
- printTaskLog(response, out, taskId, start, end, plainText, filter);
+ printTaskLog(response, out, taskId, start, end, plainText, filter,
+ isCleanup);
}
out.write("</body></html>\n".getBytes());
@@ -226,7 +234,8 @@
response.sendError(HttpServletResponse.SC_BAD_REQUEST,
"You must supply a value for `filter' (STDOUT, STDERR, or SYSLOG) if you set plainText = true");
} else {
- printTaskLog(response, out, taskId, start, end, plainText, filter);
+ printTaskLog(response, out, taskId, start, end, plainText, filter,
+ isCleanup);
}
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskMemoryManagerThread.java Thu Feb 5 17:39:27 2009
@@ -18,7 +18,7 @@
package org.apache.hadoop.mapred;
-import java.io.IOException;
+import java.io.File;
import java.util.HashMap;
import java.util.Iterator;
import java.util.Map;
@@ -27,9 +27,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.LocalDirAllocator;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.TaskTracker;
import org.apache.hadoop.mapred.TaskTracker.TaskInProgress;
import org.apache.hadoop.util.ProcfsBasedProcessTree;
@@ -71,11 +68,11 @@
ProcfsBasedProcessTree.DEFAULT_SLEEPTIME_BEFORE_SIGKILL);
}
- public void addTask(TaskAttemptID tid, long memLimit) {
+ public void addTask(TaskAttemptID tid, long memLimit, String pidFile) {
synchronized (tasksToBeAdded) {
LOG.debug("Tracking ProcessTree " + tid + " for the first time");
ProcessTreeInfo ptInfo = new ProcessTreeInfo(tid, null, null, memLimit,
- sleepTimeBeforeSigKill);
+ sleepTimeBeforeSigKill, pidFile);
tasksToBeAdded.put(tid, ptInfo);
}
}
@@ -91,9 +88,11 @@
private String pid;
private ProcfsBasedProcessTree pTree;
private long memLimit;
+ private String pidFile;
public ProcessTreeInfo(TaskAttemptID tid, String pid,
- ProcfsBasedProcessTree pTree, long memLimit, long sleepTimeBeforeSigKill) {
+ ProcfsBasedProcessTree pTree, long memLimit,
+ long sleepTimeBeforeSigKill, String pidFile) {
this.tid = tid;
this.pid = pid;
this.pTree = pTree;
@@ -101,6 +100,7 @@
this.pTree.setSigKillInterval(sleepTimeBeforeSigKill);
}
this.memLimit = memLimit;
+ this.pidFile = pidFile;
}
public TaskAttemptID getTID() {
@@ -171,7 +171,8 @@
// Initialize any uninitialized processTrees
if (pId == null) {
- pId = getPid(tid); // get pid from pid-file
+ // get pid from pid-file
+ pId = getPid(ptInfo.pidFile);
if (pId != null) {
// PID will be null, either if the pid file is yet to be created
// or if the tip is finished and we removed pidFile, but the TIP
@@ -309,47 +310,13 @@
/**
* Load pid of the task from the pidFile.
*
- * @param tipID
+ * @param pidFileName
* @return the pid of the task process.
*/
- private String getPid(TaskAttemptID tipID) {
- Path pidFileName = getPidFilePath(tipID, taskTracker.getJobConf());
- if (pidFileName == null) {
- return null;
- }
- return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName.toString());
- }
-
- private static LocalDirAllocator lDirAlloc =
- new LocalDirAllocator("mapred.local.dir");
-
- /**
- * Get the pidFile path of a Task
- * @param tipID
- * @return pidFile's Path
- */
- public static Path getPidFilePath(TaskAttemptID tipID, JobConf conf) {
- Path pidFileName = null;
- try {
- //this actually need not use a localdirAllocator since the PID
- //files are really small..
- pidFileName = lDirAlloc.getLocalPathToRead(
- (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + tipID),
- conf);
- } catch (IOException i) {
- // PID file is not there
- LOG.debug("Failed to get pidFile name for " + tipID);
- }
- return pidFileName;
- }
- public void removePidFile(TaskAttemptID tid) {
- if (taskTracker.isTaskMemoryManagerEnabled()) {
- Path pidFilePath = getPidFilePath(tid, taskTracker.getJobConf());
- if (pidFilePath != null) {
- try {
- FileSystem.getLocal(taskTracker.getJobConf()).delete(pidFilePath, false);
- } catch(IOException ie) {}
- }
- }
+ private String getPid(String pidFileName) {
+ if ((new File(pidFileName)).exists()) {
+ return ProcfsBasedProcessTree.getPidFromPidFile(pidFileName);
+ }
+ return null;
}
}
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java?rev=741203&r1=741202&r2=741203&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/TaskRunner.java Thu Feb 5 17:39:27 2009
@@ -113,9 +113,10 @@
new Path(conf.getJar()).getParent().toString());
}
File workDir = new File(lDirAlloc.getLocalPathToRead(
- TaskTracker.getJobCacheSubdir()
- + Path.SEPARATOR + t.getJobID()
- + Path.SEPARATOR + t.getTaskID()
+ TaskTracker.getLocalTaskDir(
+ t.getJobID().toString(),
+ t.getTaskID().toString(),
+ t.isTaskCleanupTask())
+ Path.SEPARATOR + MRConstants.WORKDIR,
conf). toString());
@@ -374,12 +375,12 @@
vargs.add(Integer.toString(address.getPort()));
vargs.add(taskid.toString()); // pass task identifier
- String pidFile = null;
- if (tracker.isTaskMemoryManagerEnabled()) {
- pidFile = lDirAlloc.getLocalPathForWrite(
- (TaskTracker.getPidFilesSubdir() + Path.SEPARATOR + taskid),
+ String pidFile = lDirAlloc.getLocalPathForWrite(
+ (TaskTracker.getPidFile(t.getJobID().toString(),
+ taskid.toString(), t.isTaskCleanupTask())),
this.conf).toString();
- }
+ t.setPidFile(pidFile);
+ tracker.addToMemoryManager(t.getTaskID(), conf, pidFile);
// set memory limit using ulimit if feasible and necessary ...
String[] ulimitCmd = Shell.getUlimitMemoryCommand(conf);
@@ -458,7 +459,7 @@
}catch(IOException ie){
LOG.warn("Error releasing caches : Cache files might not have been cleaned up");
}
- tracker.reportTaskFinished(t.getTaskID(), false);
+ tip.reportTaskFinished();
}
}