You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:50:07 UTC
svn commit: r1077189 - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
contrib/fairscheduler/src/java/org/apache/ha...
Author: omalley
Date: Fri Mar 4 03:50:07 2011
New Revision: 1077189
URL: http://svn.apache.org/viewvc?rev=1077189&view=rev
Log:
commit b6cfbae5bac6c8d50eb3085781adee81b11c5fa6
Author: Hemanth Yamijala <yh...@yahoo-inc.com>
Date: Mon Feb 22 17:11:21 2010 +0530
MAPREDUCE:1316 from https://issues.apache.org/jira/secure/attachment/12436563/mapreduce-1316-y20s.patch
+++ b/YAHOO-CHANGES.txt
+ MAPREDUCE-1316. Fixes a memory leak of TaskInProgress instances in
+ the jobtracker. (Amar Kamat via yhemanth)
+
Modified:
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLostTracker.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetails.jsp
hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:50:07 2011
@@ -832,7 +832,7 @@ class CapacityTaskScheduler extends Task
//Check if job supports speculative map execution first then
//check if job has speculative maps.
return (job.getJobConf().getMapSpeculativeExecution())&& (
- hasSpeculativeTask(job.getMapTasks(),
+ hasSpeculativeTask(job.getTasks(TaskType.MAP),
job.getStatus().mapProgress(), tts));
}
@@ -895,7 +895,7 @@ class CapacityTaskScheduler extends Task
//check if the job supports reduce speculative execution first then
//check if the job has speculative tasks.
return (job.getJobConf().getReduceSpeculativeExecution()) && (
- hasSpeculativeTask(job.getReduceTasks(),
+ hasSpeculativeTask(job.getTasks(TaskType.REDUCE),
job.getStatus().reduceProgress(), tts));
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:50:07 2011
@@ -172,7 +172,6 @@ public class TestCapacityScheduler exten
super(jId, jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(jId, 0f, 0f, JobStatus.PREP);
this.status.setJobPriority(JobPriority.NORMAL);
this.status.setStartTime(startTime);
if (null == jobConf.getQueueName()) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Mar 4 03:50:07 2011
@@ -405,7 +405,8 @@ public class FairScheduler extends TaskS
int totalMaps = job.numMapTasks;
int finishedMaps = 0;
int runningMaps = 0;
- for (TaskInProgress tip: job.getMapTasks()) {
+ for (TaskInProgress tip :
+ job.getTasks(org.apache.hadoop.mapreduce.TaskType.MAP)) {
if (tip.isComplete()) {
finishedMaps += 1;
} else if (tip.isRunning()) {
@@ -419,7 +420,8 @@ public class FairScheduler extends TaskS
int totalReduces = job.numReduceTasks;
int finishedReduces = 0;
int runningReduces = 0;
- for (TaskInProgress tip: job.getReduceTasks()) {
+ for (TaskInProgress tip :
+ job.getTasks(org.apache.hadoop.mapreduce.TaskType.REDUCE)) {
if (tip.isComplete()) {
finishedReduces += 1;
} else if (tip.isRunning()) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Mar 4 03:50:07 2011
@@ -504,7 +504,8 @@ public class TestFairScheduler extends T
// Finish up the tasks and advance time again. Note that we must finish
// the task since FakeJobInProgress does not properly maintain running
// tasks, so the scheduler will always get an empty task list from
- // the JobInProgress's getMapTasks/getReduceTasks and think they finished.
+ // the JobInProgress's getTasks(TaskType.MAP)/getTasks(TaskType.REDUCE) and
+ // think they finished.
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0");
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0");
taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000003_0");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 03:50:07 2011
@@ -297,6 +297,8 @@ public class JobInProgress {
this.runningReduces = new LinkedHashSet<TaskInProgress>();
this.resourceEstimator = new ResourceEstimator(this);
this.status = new JobStatus(jobid, 0.0f, 0.0f, JobStatus.PREP);
+ this.profile = new JobProfile(conf.getUser(), jobid, "", "",
+ conf.getJobName(), conf.getQueueName());
this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
(numMapTasks + numReduceTasks + 10);
try {
@@ -665,6 +667,10 @@ public class JobInProgress {
tasksInited.set(true);
JobHistory.JobInfo.logInited(profile.getJobID(), this.launchTime,
numMapTasks, numReduceTasks);
+
+ // Log the number of map and reduce tasks
+ LOG.info("Job " + jobId + " initialized successfully with " + numMapTasks
+ + " map tasks and " + numReduceTasks + " reduce tasks.");
}
TaskSplitMetaInfo[] createSplits(org.apache.hadoop.mapreduce.JobID jobId)
@@ -770,39 +776,46 @@ public class JobInProgress {
return launchedSetup;
}
- /**
- * Get the list of map tasks
- * @return the raw array of maps for this job
- */
- TaskInProgress[] getMapTasks() {
- return maps;
- }
+ /**
+ * Get all the tasks of the desired type in this job.
+ * @param type {@link TaskType} of the tasks required
+ * @return An array of {@link TaskInProgress} matching the given type.
+ * Returns an empty array if no tasks are found for the given type.
+ */
+ TaskInProgress[] getTasks(TaskType type) {
+ TaskInProgress[] tasks = null;
+ switch (type) {
+ case MAP:
+ {
+ tasks = maps;
+ }
+ break;
+ case REDUCE:
+ {
+ tasks = reduces;
+ }
+ break;
+ case JOB_SETUP:
+ {
+ tasks = setup;
+ }
+ break;
+ case JOB_CLEANUP:
+ {
+ tasks = cleanup;
+ }
+ break;
+ default:
+ {
+ tasks = new TaskInProgress[0];
+ }
+ break;
+ }
- /**
- * Get the list of cleanup tasks
- * @return the array of cleanup tasks for the job
- */
- TaskInProgress[] getCleanupTasks() {
- return cleanup;
- }
-
- /**
- * Get the list of setup tasks
- * @return the array of setup tasks for the job
- */
- TaskInProgress[] getSetupTasks() {
- return setup;
+ return tasks;
}
/**
- * Get the list of reduce tasks
- * @return the raw array of reduce tasks for this job
- */
- TaskInProgress[] getReduceTasks() {
- return reduces;
- }
-
- /**
* Return the nonLocalRunningMaps
* @return
*/
@@ -3109,11 +3122,11 @@ public class JobInProgress {
"submitTime" + EQUALS + job.getStartTime() + StringUtils.COMMA +
"launchTime" + EQUALS + job.getLaunchTime() + StringUtils.COMMA +
"finishTime" + EQUALS + job.getFinishTime() + StringUtils.COMMA +
- "numMaps" + EQUALS + job.getMapTasks().length +
+ "numMaps" + EQUALS + job.getTasks(TaskType.MAP).length +
StringUtils.COMMA +
"numSlotsPerMap" + EQUALS + job.getNumSlotsPerMap() +
StringUtils.COMMA +
- "numReduces" + EQUALS + job.getReduceTasks().length +
+ "numReduces" + EQUALS + job.getTasks(TaskType.REDUCE).length +
StringUtils.COMMA +
"numSlotsPerReduce" + EQUALS + job.getNumSlotsPerReduce() +
StringUtils.COMMA +
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 03:50:07 2011
@@ -2446,8 +2446,7 @@ public class JobTracker implements MRCon
// and TaskInProgress
///////////////////////////////////////////////////////
void createTaskEntry(TaskAttemptID taskid, String taskTracker, TaskInProgress tip) {
- LOG.info("Adding task " +
- (tip.isCleanupAttempt(taskid) ? "(cleanup)" : "") +
+ LOG.info("Adding task (" + tip.getAttemptType(taskid) + ") " +
"'" + taskid + "' to tip " +
tip.getTIPId() + ", for tracker '" + taskTracker + "'");
@@ -2480,9 +2479,9 @@ public class JobTracker implements MRCon
}
// taskid --> TIP
- taskidToTIPMap.remove(taskid);
-
- LOG.debug("Removing task '" + taskid + "'");
+ if (taskidToTIPMap.remove(taskid) != null) {
+ LOG.info("Removing task '" + taskid + "'");
+ }
}
/**
@@ -2511,7 +2510,7 @@ public class JobTracker implements MRCon
* @param job the completed job
*/
void markCompletedJob(JobInProgress job) {
- for (TaskInProgress tip : job.getSetupTasks()) {
+ for (TaskInProgress tip : job.getTasks(TaskType.JOB_SETUP)) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -2521,7 +2520,7 @@ public class JobTracker implements MRCon
}
}
}
- for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskInProgress tip : job.getTasks(TaskType.MAP)) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -2533,7 +2532,7 @@ public class JobTracker implements MRCon
}
}
}
- for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskInProgress tip : job.getTasks(TaskType.REDUCE)) {
for (TaskStatus taskStatus : tip.getTaskStatuses()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING &&
taskStatus.getRunState() != TaskStatus.State.COMMIT_PENDING &&
@@ -2561,8 +2560,10 @@ public class JobTracker implements MRCon
if (markedTaskSet != null) {
for (TaskAttemptID taskid : markedTaskSet) {
removeTaskEntry(taskid);
- LOG.info("Removed completed task '" + taskid + "' from '" +
- taskTracker + "'");
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Removed marked completed task '" + taskid + "' from '" +
+ taskTracker + "'");
+ }
}
// Clear
trackerToMarkedTasksMap.remove(taskTracker);
@@ -2580,15 +2581,16 @@ public class JobTracker implements MRCon
*
* @param job the job about to be 'retired'
*/
- synchronized private void removeJobTasks(JobInProgress job) {
- for (TaskInProgress tip : job.getMapTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- removeTaskEntry(taskStatus.getTaskID());
- }
- }
- for (TaskInProgress tip : job.getReduceTasks()) {
- for (TaskStatus taskStatus : tip.getTaskStatuses()) {
- removeTaskEntry(taskStatus.getTaskID());
+ synchronized void removeJobTasks(JobInProgress job) {
+ // iterate over all the task types
+ for (TaskType type : TaskType.values()) {
+ // iterate over all the tips of the type under consideration
+ for (TaskInProgress tip : job.getTasks(type)) {
+ // iterate over all the task-ids in the tip under consideration
+ for (TaskAttemptID id : tip.getAllTaskAttemptIDs()) {
+ // remove the task-id entry from the jobtracker
+ removeTaskEntry(id);
+ }
}
}
}
@@ -3697,6 +3699,9 @@ public class JobTracker implements MRCon
}
}
myInstrumentation.submitJob(job.getJobConf(), jobId);
+ LOG.info("Job " + jobId + " added successfully for user '"
+ + job.getJobConf().getUser() + "' to queue '"
+ + job.getJobConf().getQueueName() + "'");
return job.getStatus();
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java Fri Mar 4 03:50:07 2011
@@ -21,6 +21,7 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Comparator;
import java.util.Iterator;
import java.util.List;
@@ -32,6 +33,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.mapred.SortedRanges.Range;
import org.apache.hadoop.mapreduce.split.JobSplit.TaskSplitMetaInfo;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.net.Node;
@@ -291,6 +293,32 @@ class TaskInProgress {
}
/**
+ * Returns the type of the {@link TaskAttemptID} passed.
+ * The type of an attempt is determined by the nature of the task and not its
+ * id.
+ * For example,
+ * - Attempt 'attempt_123_01_m_01_0' might be a job-setup task even though it
+ * has a _m_ in its id. Hence the task type of this attempt is JOB_SETUP
+ * instead of MAP.
+ * - Similarly reduce attempt 'attempt_123_01_r_01_0' might have failed and is
+ * now supposed to do the task-level cleanup. In such a case this attempt
+ * will be of type TASK_CLEANUP instead of REDUCE.
+ */
+ TaskType getAttemptType (TaskAttemptID id) {
+ if (isCleanupAttempt(id)) {
+ return TaskType.TASK_CLEANUP;
+ } else if (isJobSetupTask()) {
+ return TaskType.JOB_SETUP;
+ } else if (isJobCleanupTask()) {
+ return TaskType.JOB_CLEANUP;
+ } else if (isMapTask()) {
+ return TaskType.MAP;
+ } else {
+ return TaskType.REDUCE;
+ }
+ }
+
+ /**
* Is the Task associated with taskid is the first attempt of the tip?
* @param taskId
* @return Returns true if the Task is the first attempt of the tip
@@ -763,6 +791,13 @@ class TaskInProgress {
}
/**
+ * Get all the {@link TaskAttemptID}s in this {@link TaskInProgress}
+ */
+ TaskAttemptID[] getAllTaskAttemptIDs() {
+ return tasks.toArray(new TaskAttemptID[tasks.size()]);
+ }
+
+ /**
* Get the status of the specified task
* @param taskid
* @return
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskTracker.java Fri Mar 4 03:50:07 2011
@@ -1466,7 +1466,7 @@ public class TaskTracker
* @return false if the tracker was unknown
* @throws IOException
*/
- private HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ HeartbeatResponse transmitHeartBeat(long now) throws IOException {
// Send Counters in the status once every COUNTER_UPDATE_INTERVAL
boolean sendCounters;
if (now > (previousUpdate + COUNTER_UPDATE_INTERVAL)) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/MiniMRCluster.java Fri Mar 4 03:50:07 2011
@@ -188,7 +188,7 @@ public class MiniMRCluster {
try {
tt = ugi.doAs(new PrivilegedExceptionAction<TaskTracker>() {
public TaskTracker run() throws InterruptedException, IOException {
- return new TaskTracker(conf);
+ return createTaskTracker(conf);
}
});
isInitialized = true;
@@ -198,7 +198,14 @@ public class MiniMRCluster {
LOG.error("task tracker " + trackerId + " crashed", e);
}
}
-
+
+ /**
+ * Creates a default {@link TaskTracker} using the conf passed.
+ */
+ TaskTracker createTaskTracker(JobConf conf) throws InterruptedException, IOException {
+ return new TaskTracker(conf);
+ }
+
/**
* Create and run the task tracker.
*/
@@ -679,6 +686,13 @@ public class MiniMRCluster {
TaskTrackerRunner taskTracker;
taskTracker = new TaskTrackerRunner(idx, numDir, host, conf);
+ addTaskTracker(taskTracker);
+ }
+
+ /**
+ * Add a tasktracker to the Mini-MR cluster.
+ */
+ void addTaskTracker(TaskTrackerRunner taskTracker) {
Thread taskTrackerThread = new Thread(taskTracker);
taskTrackerList.add(taskTracker);
taskTrackerThreadList.add(taskTrackerThread);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobHistory.java Fri Mar 4 03:50:07 2011
@@ -38,6 +38,7 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.permission.FsPermission;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.JobHistory.*;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.security.UserGroupInformation;
@@ -604,7 +605,7 @@ public class TestJobHistory extends Test
TaskID mapTaskId = new TaskID(job.getID(), true, 0);
TaskID reduceTaskId = new TaskID(job.getID(), false, 0);
- TaskInProgress cleanups[] = jip.getCleanupTasks();
+ TaskInProgress cleanups[] = jip.getTasks(TaskType.JOB_CLEANUP);
TaskID cleanupTaskId;
if (cleanups[0].isComplete()) {
cleanupTaskId = cleanups[0].getTIPId();
@@ -613,7 +614,7 @@ public class TestJobHistory extends Test
cleanupTaskId = cleanups[1].getTIPId();
}
- TaskInProgress setups[] = jip.getSetupTasks();
+ TaskInProgress setups[] = jip.getTasks(TaskType.JOB_SETUP);
TaskID setupTaskId;
if (setups[0].isComplete()) {
setupTaskId = setups[0].getTIPId();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobQueueTaskScheduler.java Fri Mar 4 03:50:07 2011
@@ -50,7 +50,6 @@ public class TestJobQueueTaskScheduler e
super(new JobID("test", ++jobCounter), jobConf, jt);
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
this.status.setJobPriority(JobPriority.NORMAL);
this.status.setStartTime(startTime);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java Fri Mar 4 03:50:07 2011
@@ -25,27 +25,42 @@ import java.net.URL;
import junit.framework.TestCase;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobTracker.RetireJobInfo;
+import org.apache.hadoop.mapred.MiniMRCluster.TaskTrackerRunner;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.mapreduce.split.JobSplit;
/**
* Test if the job retire works fine.
*/
public class TestJobRetire extends TestCase {
+ static final Log LOG = LogFactory.getLog(TestJobRetire.class);
static final Path testDir =
new Path(System.getProperty("test.build.data","/tmp"),
"job-expiry-testing");
+ private MiniMRCluster startCluster(JobConf conf, int numTrackers)
+ throws IOException {
+ conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
+ conf.setLong("mapred.jobtracker.retirejob.interval", 0);
+ conf.setLong("mapred.jobtracker.retirejob.check", 0);
+ conf.getLong("mapred.jobtracker.completeuserjobs.maximum", 0);
+
+ return new MiniMRCluster(0, 0, numTrackers, "file:///", 1, null, null, null,
+ conf, 0);
+ }
+
public void testJobRetire() throws Exception {
MiniMRCluster mr = null;
try {
JobConf conf = new JobConf();
-
- conf.setLong("mapred.job.tracker.retiredjobs.cache.size", 1);
- conf.setLong("mapred.jobtracker.retirejob.interval", 0);
- conf.setLong("mapred.jobtracker.retirejob.check", 0);
- conf.getLong("mapred.jobtracker.completeuserjobs.maximum", 0);
- mr = new MiniMRCluster(0, 0, 1, "file:///", 1, null, null, null, conf, 0);
+ mr = startCluster(conf, 1);
+
JobConf jobConf = mr.createJobConf();
JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
@@ -63,6 +78,7 @@ public class TestJobRetire extends TestC
1, jobtracker.getAllJobs().length);
} finally {
if (mr != null) { mr.shutdown();}
+ FileUtil.fullyDelete(new File(testDir.toString()));
}
}
@@ -74,13 +90,8 @@ public class TestJobRetire extends TestC
assertTrue(rj.isSuccessful());
JobID id = rj.getID();
- JobInProgress job = jobtracker.getJob(id);
//wait for job to get retired
- for (int i = 0; i < 10 && job != null; i++) {
- UtilsForTests.waitFor(1000);
- job = jobtracker.getJob(id);
- }
- assertNull("Job did not retire", job);
+ waitTillRetire(id, jobtracker);
RetireJobInfo retired = jobtracker.retireJobs.get(id);
assertTrue("History url not set", retired.getHistoryFile() != null &&
retired.getHistoryFile().length() > 0);
@@ -107,5 +118,242 @@ public class TestJobRetire extends TestC
return id;
}
+
+ // wait till the job retires
+ private void waitTillRetire(JobID id, JobTracker jobtracker) {
+ //wait for job to get retired
+ JobInProgress job = jobtracker.getJob(id);
+ for (int i = 0; i < 10 && job != null; i++) {
+ UtilsForTests.waitFor(1000);
+ job = jobtracker.getJob(id);
+ }
+ assertNull("Job did not retire", job);
+ }
+
+ /**
+ * Custom TaskTracker which waits forever after a successful contact to
+ * the JobTracker.
+ */
+ class WaitingTaskTracker extends TaskTracker {
+
+ WaitingTaskTracker(JobConf conf) throws InterruptedException, IOException {
+ super(conf);
+ }
+
+ @Override
+ HeartbeatResponse transmitHeartBeat(long now) throws IOException {
+ HeartbeatResponse response = super.transmitHeartBeat(now);
+ LOG.info("WaitingTaskTracker waiting");
+ // wait forever
+ UtilsForTests.waitFor(Long.MAX_VALUE);
+ throw new IOException ("WaitingTaskTracker interrupted. Bailing out");
+ }
+ }
+
+ /**
+ * Test job retire with tasks that report their *first* status only after the
+ * job retires.
+ * Steps :
+ * - Start a mini-mr cluster with 1 task-tracker having only map slots.
+ * Note that this task-tracker will take care of setup/cleanup and the map
+ * tasks.
+ * - Submit a job with 1 map task and 1 reduce task
+ * - Wait for the job to finish the map task
+ * - Start a 2nd tracker that waits for a long time after contacting the JT.
+ * - Wait for the 2nd tracker to get stuck
+ * - Kill the job
+ * - Wait for the job to retire
+ * - Check if the tip mappings are cleaned up.
+ */
+ public void testJobRetireWithUnreportedTasks() throws Exception {
+ MiniMRCluster mr = null;
+ try {
+ JobConf conf = new JobConf();
+ // set the num-map-slots to 1 so that no reduce tasks but setup/cleanup
+ // can run on it
+ conf.setInt("mapred.tasktracker.map.tasks.maximum", 1);
+ conf.setInt("mapred.tasktracker.reduce.tasks.maximum", 0);
+
+ mr = startCluster(conf, 1);
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ RunningJob job =
+ UtilsForTests.runJob(mr.createJobConf(), new Path(testDir, "in-1"),
+ new Path(testDir, "out-1"), 1, 1);
+ JobID id = JobID.downgrade(job.getID());
+ JobInProgress jip = jobtracker.getJob(id);
+
+ // wait 100 secs for the job to complete its map task
+ for (int i = 0; i < 1000 && jip.finishedMaps() < 1; i++) {
+ UtilsForTests.waitFor(100);
+ }
+ assertEquals(jip.finishedMaps(), 1);
+
+ // start a tracker that will wait
+ LOG.info("Adding a waiting tracker");
+ TaskTrackerRunner testTrackerRunner =
+ mr.new TaskTrackerRunner(1, 1, null, mr.createJobConf()) {
+ @Override
+ TaskTracker createTaskTracker(JobConf conf) throws InterruptedException, IOException {
+ return new WaitingTaskTracker(conf);
+ }
+ };
+ mr.addTaskTracker(testTrackerRunner);
+ LOG.info("Waiting tracker added");
+
+ WaitingTaskTracker testTT =
+ (WaitingTaskTracker)testTrackerRunner.getTaskTracker();
+
+ // wait 100 secs for the newly started task-tracker to join
+ for (int i = 0; i < 1000 && jobtracker.taskTrackers().size() < 2; i++) {
+ UtilsForTests.waitFor(100);
+ }
+ assertEquals(jobtracker.taskTrackers().size(), 2);
+ LOG.info("Cluster is now up with 2 trackers");
+ // stop the test-tt as its no longer required
+ mr.stopTaskTracker(mr.getTaskTrackerID(testTT.getName()));
+
+ // 1 reduce task should be scheduled
+ assertEquals("TestTT contacted but no reduce task scheduled on it",
+ 1, jip.runningReduces());
+
+ // kill the job
+ LOG.info("Killing job " + id);
+ job.killJob();
+
+ // check if the reduce task attempt status is missing
+ TaskInProgress tip = jip.getTasks(TaskType.REDUCE)[0];
+ assertNull(tip.getTaskStatus(tip.getAllTaskAttemptIDs()[0]));
+
+ // wait for the job to retire
+ waitTillRetire(id, jobtracker);
+
+ // check the taskidToTIPMap
+ for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+ LOG.info("TaskidToTIP : " + tid);
+ }
+ assertEquals("'taskid' to TIP mapping still exists",
+ 0, jobtracker.taskidToTIPMap.size());
+ } finally {
+ if (mr != null) { mr.shutdown();}
+ // cleanup
+ FileUtil.fullyDelete(new File(testDir.toString()));
+ }
+ }
+ /**
+ * (Mock)Test JobTracker.removeJobTasks() which is called only when the job
+ * retires.
+ */
+ public void testJobRemoval() throws Exception {
+ MiniMRCluster mr = null;
+ try {
+ JobConf conf = new JobConf();
+ mr = startCluster(conf, 0);
+ JobTracker jobtracker = mr.getJobTrackerRunner().getJobTracker();
+
+ // test map task removal
+ testRemoveJobTasks(jobtracker, conf, TaskType.MAP);
+ // test reduce task removal
+ testRemoveJobTasks(jobtracker, conf, TaskType.REDUCE);
+ // test job setup removal
+ testRemoveJobTasks(jobtracker, conf, TaskType.JOB_SETUP);
+ // test job cleanup removal
+ testRemoveJobTasks(jobtracker, conf, TaskType.JOB_CLEANUP);
+ } finally {
+ if (mr != null) { mr.shutdown();}
+ // cleanup
+ FileUtil.fullyDelete(new File(testDir.toString()));
+ }
+ }
+
+ // create a new job and add it to the jobtracker
+ private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) {
+ // submit a job in a fake manner
+ // get the new job-id
+ JobID id =
+ new JobID(jobtracker.getTrackerIdentifier(), jobtracker.jobs.size() + 1);
+ // create a JobInProgress for this fake job
+ JobInProgress jip = new JobInProgress(id, conf, jobtracker);
+
+ // insert this fake completed job in the jobtracker
+ jobtracker.jobs.put(id, jip);
+
+ return jip;
+ }
+
+ // create a new TaskInProgress and make it running by adding it to jobtracker
+ private TaskInProgress createAndAddTIP(JobTracker jobtracker,
+ JobInProgress jip, TaskType type) {
+ JobConf conf = jip.getJobConf();
+ JobID id = jip.getJobID();
+ // now create a fake tip for this fake job
+ TaskInProgress tip = null;
+ if (type == TaskType.MAP) {
+ tip = new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT,
+ jobtracker, conf, jip, 0, 1);
+ jip.maps = new TaskInProgress[] {tip};
+ } else if (type == TaskType.REDUCE) {
+ tip = new TaskInProgress(id, "dummy", jip.desiredMaps(), 0,
+ jobtracker, conf, jip, 1);
+ jip.reduces = new TaskInProgress[] {tip};
+ } else if (type == TaskType.JOB_SETUP) {
+ tip =
+ new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT,
+ jobtracker, conf, jip, 0, 1);
+ jip.setup = new TaskInProgress[] {tip};
+ } else if (type == TaskType.JOB_CLEANUP) {
+ tip =
+ new TaskInProgress(id, "dummy", JobSplit.EMPTY_TASK_SPLIT,
+ jobtracker, conf, jip, 0, 1);
+ jip.cleanup = new TaskInProgress[] {tip};
+ }
+ return tip;
+ }
+
+ // create a new Task for the given tip and make it running
+ private TaskAttemptID createAndAddAttempt(TaskInProgress tip, int attemptId) {
+ // create a fake attempt for this fake task
+ TaskAttemptID taskid = new TaskAttemptID(tip.getTIPId(), attemptId);
+
+ // insert this fake task into the jobtracker by making it running
+ tip.addRunningTask(taskid, "test-tt");
+
+ return taskid;
+ }
+
+ // Mock a job run such that the jobtracker is in a state similar to that
+ // resulting from an actual job run.
+ // Steps :
+ // - generate a new job-id
+ // - create and add a JobInProgress object using the fake job-id
+ // - create and add a fake tip of the passed type 't' under the fake job
+ // Note that t can be a MAP or a REDUCE or a JOB_SETUP or a JOB_CLEANUP.
+ // - create and add a fake attempt under the fake tip
+ // - remove the job from the jobtracker
+ // - check if the fake attempt is removed from the jobtracker
+ private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf,
+ TaskType type) {
+ // create and submit a job
+ JobInProgress jip = createAndAddJob(jobtracker, conf);
+ // create and add a tip
+ TaskInProgress tip = createAndAddTIP(jobtracker, jip, type);
+ // create and add an attempt
+ TaskAttemptID taskid = createAndAddAttempt(tip, 0);
+
+ // this fake attempt should not have any status
+ assertNull(tip.getTaskStatus(taskid));
+
+ // remove the job tasks for this fake job from the jobtracker
+ jobtracker.removeJobTasks(jip);
+
+ // check the taskidToTIPMap
+ for (TaskAttemptID tid : jobtracker.taskidToTIPMap.keySet()) {
+ LOG.info("TaskidToTIP : " + tid);
+ }
+
+ // check if the fake attempt is removed from the jobtracker
+ assertEquals("'taskid' to TIP mapping still exists",
+ 0, jobtracker.taskidToTIPMap.size());
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLostTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLostTracker.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLostTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestLostTracker.java Fri Mar 4 03:50:07 2011
@@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.mapreduce.TaskType;
import junit.framework.TestCase;
import java.io.*;
@@ -96,8 +97,8 @@ public class TestLostTracker extends Tes
// check if the task statuses for the tasks are sane
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
- for (TaskInProgress taskInProgress : jt.getJob(id).getMapTasks()) {
- testTaskStatuses(taskInProgress.getTaskStatuses());
+ for (TaskInProgress mtip : jt.getJob(id).getTasks(TaskType.MAP)) {
+ testTaskStatuses(mtip.getTaskStatuses());
}
// validate the history file
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestParallelInitialization.java Fri Mar 4 03:50:07 2011
@@ -48,7 +48,6 @@ public class TestParallelInitialization
super(new JobID("test", ++jobCounter), jobConf,
jt);
this.startTime = System.currentTimeMillis();
- this.status = new JobStatus(getJobID(), 0f, 0f, JobStatus.PREP);
this.status.setJobPriority(JobPriority.NORMAL);
this.status.setStartTime(startTime);
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestSetupAndCleanupFailure.java Fri Mar 4 03:50:07 2011
@@ -28,6 +28,7 @@ import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.mapred.lib.IdentityMapper;
import org.apache.hadoop.mapred.lib.IdentityReducer;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* Tests various failures in setup/cleanup of job, like
@@ -184,7 +185,8 @@ public class TestSetupAndCleanupFailure
JobTracker jt = mr.getJobTrackerRunner().getJobTracker();
JobInProgress jip = jt.getJob(job.getID());
// get the running setup task id
- TaskAttemptID setupID = getRunningTaskID(jip.getSetupTasks());
+ TaskAttemptID setupID =
+ getRunningTaskID(jip.getTasks(TaskType.JOB_SETUP));
if (commandLineKill) {
killTaskFromCommandLine(job, setupID, jt);
} else {
@@ -201,7 +203,8 @@ public class TestSetupAndCleanupFailure
} catch (InterruptedException ie) {}
}
// get the running cleanup task id
- TaskAttemptID cleanupID = getRunningTaskID(jip.getCleanupTasks());
+ TaskAttemptID cleanupID =
+ getRunningTaskID(jip.getTasks(TaskType.JOB_CLEANUP));
if (commandLineKill) {
killTaskFromCommandLine(job, cleanupID, jt);
} else {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetails.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetails.jsp?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetails.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobdetails.jsp Fri Mar 4 03:50:07 2011
@@ -7,6 +7,7 @@
import="java.util.*"
import="java.text.DecimalFormat"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.mapreduce.TaskType"
import="org.apache.hadoop.util.*"
%>
@@ -216,7 +217,8 @@
out.print("<b>Job File:</b> <a href=\"jobconf.jsp?jobid=" + jobId + "\">"
+ profile.getJobFile() + "</a><br>\n");
out.print("<b>Job Setup:</b>");
- printJobLevelTaskSummary(out, jobId, "setup", job.getSetupTasks());
+ printJobLevelTaskSummary(out, jobId, "setup",
+ job.getTasks(TaskType.JOB_SETUP));
out.print("<br>\n");
if (runState == JobStatus.RUNNING) {
out.print("<b>Status:</b> Running<br>\n");
@@ -248,7 +250,8 @@
}
}
out.print("<b>Job Cleanup:</b>");
- printJobLevelTaskSummary(out, jobId, "cleanup", job.getCleanupTasks());
+ printJobLevelTaskSummary(out, jobId, "cleanup",
+ job.getTasks(TaskType.JOB_CLEANUP));
out.print("<br>\n");
if (flakyTaskTrackers > 0) {
out.print("<b>Black-listed TaskTrackers:</b> " +
@@ -267,9 +270,9 @@
"<th><a href=\"jobfailures.jsp?jobid=" + jobId +
"\">Failed/Killed<br>Task Attempts</a></th></tr>\n");
printTaskSummary(out, jobId, "map", status.mapProgress(),
- job.getMapTasks());
+ job.getTasks(TaskType.MAP));
printTaskSummary(out, jobId, "reduce", status.reduceProgress(),
- job.getReduceTasks());
+ job.getTasks(TaskType.REDUCE));
out.print("</table>\n");
%>
@@ -344,7 +347,7 @@ if("off".equals(session.getAttribute("ma
style="width:100%" type="image/svg+xml" pluginspage="http://www.adobe.com/svg/viewer/install/" />
<%}%>
-<%if(job.getReduceTasks().length > 0) { %>
+<%if(job.getTasks(TaskType.REDUCE).length > 0) { %>
<hr>Reduce Completion Graph -
<%if("off".equals(session.getAttribute("reduce.graph"))) { %>
<a href="/jobdetails.jsp?jobid=<%=jobId%>&refresh=<%=refresh%>&reduce.graph=on" > open </a>
Modified: hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp?rev=1077189&r1=1077188&r2=1077189&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/webapps/job/jobfailures.jsp Fri Mar 4 03:50:07 2011
@@ -5,6 +5,7 @@
import="java.io.*"
import="java.util.*"
import="org.apache.hadoop.mapred.*"
+ import="org.apache.hadoop.mapreduce.TaskType"
import="org.apache.hadoop.util.*"
%>
@@ -125,13 +126,13 @@
out.print("<tr><th>Attempt</th><th>Task</th><th>Machine</th><th>State</th>" +
"<th>Error</th><th>Logs</th></tr>\n");
if (includeMap) {
- TaskInProgress[] tips = job.getMapTasks();
+ TaskInProgress[] tips = job.getTasks(TaskType.MAP);
for(int i=0; i < tips.length; ++i) {
printFailedAttempts(out, tracker, jobId, tips[i], state);
}
}
if (includeReduce) {
- TaskInProgress[] tips = job.getReduceTasks();
+ TaskInProgress[] tips = job.getTasks(TaskType.REDUCE);
for(int i=0; i < tips.length; ++i) {
printFailedAttempts(out, tracker, jobId, tips[i], state);
}