You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/08 20:24:10 UTC
svn commit: r494158 - in /lucene/hadoop/trunk: ./
src/java/org/apache/hadoop/mapred/
Author: cutting
Date: Mon Jan 8 11:24:08 2007
New Revision: 494158
URL: http://svn.apache.org/viewvc?view=rev&rev=494158
Log:
HADOOP-815. Fix memory leaks in JobTracker. Contributed by Arun.
Modified:
lucene/hadoop/trunk/CHANGES.txt
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
Modified: lucene/hadoop/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/CHANGES.txt?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/CHANGES.txt (original)
+++ lucene/hadoop/trunk/CHANGES.txt Mon Jan 8 11:24:08 2007
@@ -9,6 +9,8 @@
2. HADOOP-863. Reduce logging verbosity introduced by HADOOP-813.
(Devaraj Das via cutting)
+ 3. HADOOP-815. Fix memory leaks in JobTracker. (Arun C Murthy via cutting)
+
Release 0.10.0 - 2007-01-05
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobInProgress.java Mon Jan 8 11:24:08 2007
@@ -454,40 +454,52 @@
TaskStatus status,
JobTrackerMetrics metrics) {
String taskid = status.getTaskId();
+
+ // Sanity check: is the TIP already complete?
if (tip.isComplete()) {
LOG.info("Already complete TIP " + tip.getTIPId() +
- " has completed task " + taskid);
- return;
- } else {
- LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
- " successfully.");
-
- String taskTrackerName = status.getTaskTracker();
+ " has completed task " + taskid);
+
+ // Just mark this 'task' as complete
+ tip.completedTask(taskid);
- if(status.getIsMap()){
- JobHistory.MapAttempt.logStarted(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getStartTime(),
- taskTrackerName);
- JobHistory.MapAttempt.logFinished(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getFinishTime(),
- taskTrackerName);
- JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
- Values.MAP.name(), status.getFinishTime());
- }else{
- JobHistory.ReduceAttempt.logStarted(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getStartTime(),
- taskTrackerName);
- JobHistory.ReduceAttempt.logFinished(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
- status.getSortFinishTime(), status.getFinishTime(),
- taskTrackerName);
- JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
- Values.REDUCE.name(), status.getFinishTime());
+ // Let the JobTracker cleanup this taskid if the job isn't running
+ if (this.status.getRunState() != JobStatus.RUNNING) {
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
}
+ return;
+ }
+
+ LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
+ " successfully.");
+
+ // Update jobhistory
+ String taskTrackerName = status.getTaskTracker();
+ if(status.getIsMap()){
+ JobHistory.MapAttempt.logStarted(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getStartTime(),
+ taskTrackerName);
+ JobHistory.MapAttempt.logFinished(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getFinishTime(),
+ taskTrackerName);
+ JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
+ Values.MAP.name(), status.getFinishTime());
+ }else{
+ JobHistory.ReduceAttempt.logStarted(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getStartTime(),
+ taskTrackerName);
+ JobHistory.ReduceAttempt.logFinished(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
+ status.getSortFinishTime(), status.getFinishTime(),
+ taskTrackerName);
+ JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
+ Values.REDUCE.name(), status.getFinishTime());
}
+ // Mark the TIP as complete
tip.completed(taskid);
- // updating the running/finished map/reduce counts
+
+ // Update the running/finished map/reduce counts
if (tip.isMapTask()){
runningMapTasks -= 1;
finishedMapTasks += 1;
@@ -533,6 +545,10 @@
JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,
this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
metrics.completeJob();
+ } else if (this.status.getRunState() != JobStatus.RUNNING) {
+ // The job has been killed/failed,
+ // JobTracker should cleanup this task
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
}
}
@@ -541,6 +557,7 @@
*/
public synchronized void kill() {
if (status.getRunState() != JobStatus.FAILED) {
+ LOG.info("Killing job '" + this.status.getJobId() + "'");
this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
this.finishTime = System.currentTimeMillis();
this.runningMapTasks = 0;
@@ -575,7 +592,9 @@
private void failedTask(TaskInProgress tip, String taskid,
TaskStatus status, String trackerName,
boolean wasRunning, boolean wasComplete) {
+ // Mark the taskid as a 'failure'
tip.failedSubTask(taskid, trackerName);
+
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
@@ -622,6 +641,11 @@
}
//
+ // Let the JobTracker know that this task has failed
+ //
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+
+ //
// Check if we need to kill the job because of too many failures
//
if (tip.isFailed()) {
@@ -633,9 +657,7 @@
System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
kill();
}
-
- jobtracker.removeTaskEntry(taskid);
- }
+ }
/**
* Fail a task with a given reason, but without a status object.
@@ -669,6 +691,9 @@
* from the various tables.
*/
synchronized void garbageCollect() {
+ // Let the JobTracker know that a job is complete
+ jobtracker.finalizeJob(this);
+
try {
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/JobTracker.java Mon Jan 8 11:24:08 2007
@@ -47,6 +47,12 @@
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
/**
+ * The maximum no. of 'completed' (successful/failed/killed)
+ * jobs kept in memory per-user.
+ */
+ static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
+
+ /**
* Used for formatting the id numbers
*/
private static NumberFormat idFormat = NumberFormat.getInstance();
@@ -289,10 +295,26 @@
if (job.getStatus().getRunState() != JobStatus.RUNNING &&
job.getStatus().getRunState() != JobStatus.PREP &&
(job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
+ // Ok, this call to removeTaskEntries
+ // is dangerous in some very very obscure
+ // cases; e.g. when job completed, exceeded
+ // RETIRE_JOB_INTERVAL time-limit and yet
+ // some task (taskid) wasn't complete!
+ removeJobTasks(job);
+
it.remove();
-
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(job.getProfile().getUser());
+ synchronized (userJobs) {
+ userJobs.remove(job);
+ }
+ }
jobInitQueue.remove(job);
jobsByArrival.remove(job);
+
+ LOG.info("Retired job with id: '" +
+ job.getProfile().getJobId() + "'");
}
}
}
@@ -418,6 +440,9 @@
TreeMap jobs = new TreeMap();
Vector jobsByArrival = new Vector();
+ // (user -> list of JobInProgress)
+ TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
@@ -427,8 +452,12 @@
// (trackerID->TreeSet of taskids running at that tracker)
TreeMap trackerToTaskMap = new TreeMap();
- // (trackerID --> last sent HeartBeatResponseID)
- Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
+ TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+
+ // (trackerID --> last sent HeartBeatResponse)
+ Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
+ new TreeMap();
//
// Watch and expire TaskTracker objects using these structures.
@@ -644,18 +673,181 @@
// taskid --> TIP
taskidToTIPMap.put(taskid, tip);
}
+
void removeTaskEntry(String taskid) {
// taskid --> tracker
String tracker = (String) taskidToTrackerMap.remove(taskid);
// tracker --> taskid
- TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
- if (trackerSet != null) {
- trackerSet.remove(taskid);
+ if (tracker != null) {
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+ if (trackerSet != null) {
+ trackerSet.remove(taskid);
+ }
}
// taskid --> TIP
taskidToTIPMap.remove(taskid);
+
+ LOG.debug("Removing task '" + taskid + "'");
+ }
+
+ /**
+ * Mark a 'task' for removal later.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker the tasktracker at which the 'task' was running
+ * @param taskid completed (success/failure/killed) task
+ */
+ void markCompletedTaskAttempt(String taskTracker, String taskid) {
+ // tracker --> taskid
+ TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToMarkedTasksMap.put(taskTracker, taskset);
+ }
+ taskset.add(taskid);
+
+ LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+ }
+
+ /**
+ * Mark all 'non-running' jobs of the job for pruning.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param job the completed job
+ */
+ void markCompletedJob(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove all 'marked' tasks running on a given {@link TaskTracker}
+ * from the {@link JobTracker}'s data-structures.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
+ */
+ private void removeMarkedTasks(String taskTracker) {
+ // Purge all the 'marked' tasks which were running at taskTracker
+ TreeSet<String> markedTaskSet =
+ (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+ if (markedTaskSet != null) {
+ for (String taskid : markedTaskSet) {
+ removeTaskEntry(taskid);
+ LOG.info("Removed completed task '" + taskid + "' from '" +
+ taskTracker + "'");
+ }
+ // Clear
+ trackerToMarkedTasksMap.remove(taskTracker);
+ }
+ }
+
+ /**
+ * Call {@link #removeTaskEntry(String)} for each of the
+ * job's tasks.
+ * When the JobTracker is retiring the long-completed
+ * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
+ * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs
+ * has been reached, we can afford to nuke all it's tasks; a little
+ * unsafe, but practically feasible.
+ *
+ * @param job the job about to be 'retired'
+ */
+ synchronized private void removeJobTasks(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ }
+
+ /**
+ * Safe clean-up all data structures at the end of the
+ * job (success/failure/killed).
+ * Here we also ensure that for a given user we maintain
+ * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs
+ * on the JobTracker.
+ *
+ * @param job completed job.
+ */
+ synchronized void finalizeJob(JobInProgress job) {
+ // Mark the 'non-running' tasks for pruning
+ markCompletedJob(job);
+
+ // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
+ // in memory; information about the purged jobs is available via
+ // JobHistory.
+ synchronized (jobs) {
+ synchronized (jobsByArrival) {
+ synchronized (jobInitQueue) {
+ String jobUser = job.getProfile().getUser();
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(jobUser);
+ synchronized (userJobs) {
+ while (userJobs.size() >
+ MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ JobInProgress rjob = userJobs.get(0);
+
+ // Do not delete 'current'
+ // finished job just yet.
+ if (rjob == job) {
+ break;
+ }
+
+ // Cleanup all datastructures
+ int rjobRunState =
+ rjob.getStatus().getRunState();
+ if (rjobRunState == JobStatus.SUCCEEDED ||
+ rjobRunState == JobStatus.FAILED) {
+ // Ok, this call to removeTaskEntries
+ // is dangerous is some very very obscure
+ // cases; e.g. when rjob completed, hit
+ // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+ // limit and yet some task (taskid)
+ // wasn't complete!
+ removeJobTasks(rjob);
+
+ userJobs.remove(0);
+ jobs.remove(rjob.getProfile().getJobId());
+ jobInitQueue.remove(rjob);
+ jobsByArrival.remove(rjob);
+
+ LOG.info("Retired job with id: '" +
+ rjob.getProfile().getJobId() + "'");
+ } else {
+ // Do not remove jobs that aren't complete.
+ // Stop here, and let the next pass take
+ // care of purging jobs.
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
///////////////////////////////////////////////////////
@@ -736,26 +928,46 @@
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
- LOG.debug("Got heartbeat from: " + status.getTrackerName() +
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
- Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
-
- short newResponseId = (short)(responseId + 1);
- if (!initialContact && oldResponseId != null &&
- oldResponseId.shortValue() != responseId) {
- newResponseId = oldResponseId.shortValue();
+ HeartbeatResponse prevHeartbeatResponse =
+ trackerToHeartbeatResponseMap.get(trackerName);
+
+ if (initialContact != true) {
+ // If this isn't the 'initial contact' from the tasktracker,
+ // there is something seriously wrong if the JobTracker has
+ // no record of the 'previous heartbeat'; if so, ask the
+ // tasktracker to re-initialize itself.
+ if (prevHeartbeatResponse == null) {
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
+ "heartbeat for '" + trackerName +
+ "'; reinitializing the tasktracker");
+ return new HeartbeatResponse(responseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
+
+ }
+
+ // It is completely safe to ignore a 'duplicate' from a tracker
+ // since we are guaranteed that the tracker sends the same
+ // 'heartbeat' when rpcs are lost.
+ // {@see TaskTracker.transmitHeartbeat()}
+ if (prevHeartbeatResponse.getResponseId() != responseId) {
+ LOG.info("Ignoring 'duplicate' heartbeat from '" +
+ trackerName + "'");
+ return prevHeartbeatResponse;
+ }
}
// Process this heartbeat
- if (!processHeartbeat(status, initialContact,
- (newResponseId != responseId))) {
- if (oldResponseId != null) {
- trackerToHeartbeatResponseIDMap.remove(trackerName);
+ short newResponseId = (short)(responseId + 1);
+ if (!processHeartbeat(status, initialContact)) {
+ if (prevHeartbeatResponse != null) {
+ trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
@@ -784,12 +996,12 @@
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
- // Update the trackerToHeartbeatResponseIDMap
- if (newResponseId != responseId) {
- trackerToHeartbeatResponseIDMap.put(trackerName,
- new Short(newResponseId));
- }
+ // Update the trackerToHeartbeatResponseMap
+ trackerToHeartbeatResponseMap.put(trackerName, response);
+ // Done processing the hearbeat, now remove 'marked' tasks
+ removeMarkedTasks(trackerName);
+
return response;
}
@@ -824,12 +1036,9 @@
* Process incoming heartbeat messages from the task trackers.
*/
private synchronized boolean processHeartbeat(
- TaskTrackerStatus trackerStatus,
- boolean initialContact, boolean updateStatusTimestamp) {
+ TaskTrackerStatus trackerStatus, boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
- if (initialContact || updateStatusTimestamp) {
- trackerStatus.setLastSeen(System.currentTimeMillis());
- }
+ trackerStatus.setLastSeen(System.currentTimeMillis());
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
@@ -857,7 +1066,7 @@
}
updateTaskStatuses(trackerStatus);
- //LOG.info("Got heartbeat from "+trackerName);
+
return true;
}
@@ -1028,7 +1237,6 @@
killList.add(new KillTaskAction(killTaskId));
LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
} else {
- //killTasksList.add(new KillJobAction(taskId));
String killJobId = tip.getJob().getStatus().getJobId();
killJobIds.add(killJobId);
}
@@ -1051,14 +1259,28 @@
* map task outputs.
*/
public synchronized MapOutputLocation[]
- locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
- ArrayList result = new ArrayList(mapTasksNeeded.length);
+ locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce)
+ throws IOException {
+ // Check to make sure that the job hasn't 'completed'.
JobInProgress job = getJob(jobId);
+ if (job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
+ ArrayList result = new ArrayList(mapTasksNeeded.length);
for (int i = 0; i < mapTasksNeeded.length; i++) {
TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
if (status != null) {
String trackerId =
(String) taskidToTrackerMap.get(status.getTaskId());
+ // Safety check, if we can't find the taskid in
+ // taskidToTrackerMap and job isn't 'running', then just
+ // return an empty array
+ if (trackerId == null &&
+ job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
TaskTrackerStatus tracker;
synchronized (taskTrackers) {
tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
@@ -1108,10 +1330,22 @@
synchronized (jobs) {
synchronized (jobsByArrival) {
synchronized (jobInitQueue) {
- jobs.put(job.getProfile().getJobId(), job);
- jobsByArrival.add(job);
- jobInitQueue.add(job);
- jobInitQueue.notifyAll();
+ synchronized (userToJobsMap) {
+ jobs.put(job.getProfile().getJobId(), job);
+ String jobUser = job.getProfile().getUser();
+ if (!userToJobsMap.containsKey(jobUser)) {
+ userToJobsMap.put(jobUser,
+ new ArrayList<JobInProgress>());
+ }
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(jobUser);
+ synchronized (userJobs) {
+ userJobs.add(job);
+ }
+ jobsByArrival.add(job);
+ jobInitQueue.add(job);
+ jobInitQueue.notifyAll();
+ }
}
}
}
@@ -1271,8 +1505,7 @@
* jobs that might be affected.
*/
void updateTaskStatuses(TaskTrackerStatus status) {
- for (Iterator it = status.taskReports(); it.hasNext(); ) {
- TaskStatus report = (TaskStatus) it.next();
+ for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(status.getTrackerName());
String taskId = report.getTaskId();
TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
@@ -1310,8 +1543,16 @@
TaskStatus.Phase.MAP, hostname, trackerName,
myMetrics);
}
+ } else if (!tip.isMapTask() && tip.isComplete()) {
+ // Completed 'reduce' task, not failed;
+ // only removed from data-structures.
+ markCompletedTaskAttempt(trackerName, taskId);
}
}
+
+ // Purge 'marked' tasks, needs to be done
+ // here to prevent hanging references!
+ removeMarkedTasks(trackerName);
}
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskInProgress.java Mon Jan 8 11:24:08 2007
@@ -57,7 +57,6 @@
private int partition;
private JobTracker jobtracker;
private String id;
- private String totalTaskIds[];
private JobInProgress job;
// Status of the TIP
@@ -70,7 +69,13 @@
private int completes = 0;
private boolean failed = false;
private boolean killed = false;
- private TreeSet usableTaskIds = new TreeSet();
+
+ // The 'unique' prefix for taskids of this tip
+ String taskIdPrefix;
+
+ // The 'next' usable taskid of this tip
+ int nextTaskId = 0;
+
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<String, String> activeTasks = new TreeMap();
@@ -139,13 +144,8 @@
void init(String jobUniqueString) {
this.startTime = System.currentTimeMillis();
this.runSpeculative = conf.getSpeculativeExecution();
- String uniqueString = makeUniqueString(jobUniqueString);
- this.id = "tip_" + uniqueString;
- this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
- for (int i = 0; i < totalTaskIds.length; i++) {
- totalTaskIds[i] = "task_" + uniqueString + "_" + i;
- usableTaskIds.add(totalTaskIds[i]);
- }
+ this.taskIdPrefix = makeUniqueString(jobUniqueString);
+ this.id = "tip_" + this.taskIdPrefix;
}
////////////////////////////////////
@@ -180,11 +180,19 @@
}
/**
+ * Is this tip complete?
+ *
+ * @return <code>true</code> if the tip is complete, else <code>false</code>
*/
public boolean isComplete() {
return (completes > 0);
}
+
/**
+ * Is the given taskid in this tip complete?
+ *
+ * @param taskid taskid of attempt to check for completion
+ * @return <code>true</code> if taskid is complete, else <code>false</code>
*/
public boolean isComplete(String taskid) {
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
@@ -194,7 +202,11 @@
return ((completes > 0) &&
(status.getRunState() == TaskStatus.State.SUCCEEDED));
}
+
/**
+ * Is the tip a failure?
+ *
+ * @return <code>true</code> if tip has failed, else <code>false</code>
*/
public boolean isFailed() {
return failed;
@@ -293,6 +305,17 @@
TaskStatus.State oldState = oldStatus.getRunState();
TaskStatus.State newState = status.getRunState();
+ // We should never recieve a duplicate success/failure/killed
+ // status update for the same taskid! This is a safety check,
+ // and is addressed better at the TaskTracker to ensure this.
+ // @see {@link TaskTracker.transmitHeartbeat()}
+ if ((newState != TaskStatus.State.RUNNING) &&
+ (oldState == newState)) {
+ LOG.warn("Recieved duplicate status update of '" + newState +
+ "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+ return false;
+ }
+
// The task is not allowed to move from completed back to running.
// We have seen out of order status messagesmoving tasks from complete
// to running. This is a spot fix, but it should be addressed more
@@ -346,14 +369,29 @@
/**
* Indicate that one of the taskids in this TaskInProgress
- * has successfully completed!
+ * has successfully completed.
+ *
+ * However this may not be the first subtask in this
+ * TaskInProgress to be completed and hence we might not want to
+ * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
*/
- public void completed(String taskid) {
+ void completedTask(String taskid) {
LOG.info("Task '" + taskid + "' has completed.");
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.SUCCEEDED);
activeTasks.remove(taskid);
-
+ }
+
+ /**
+ * Indicate that one of the taskids in this TaskInProgress
+ * has successfully completed!
+ */
+ public void completed(String taskid) {
+ //
+ // Record that this taskid is complete
+ //
+ completedTask(taskid);
+
//
// Now that the TIP is complete, the other speculative
// subtasks will be closed when the owning tasktracker
@@ -470,8 +508,17 @@
execStartTime = System.currentTimeMillis();
}
- String taskid = (String) usableTaskIds.first();
- usableTaskIds.remove(taskid);
+ // Create the 'taskid'
+ String taskid = null;
+ if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
+ taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
+ ++nextTaskId;
+ } else {
+ LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) +
+ " attempts for the tip '" + getTIPId() + "'");
+ return null;
+ }
+
String jobId = job.getProfile().getJobId();
if (isMapTask()) {
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTracker.java Mon Jan 8 11:24:08 2007
@@ -74,6 +74,16 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
+ /*
+ * This is the last 'status' report sent by this tracker to the JobTracker.
+ *
+ * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+ * indicating that a 'fresh' status report be generated; in the event the
+ * rpc calls fails for whatever reason, the previous status report is sent
+ * again.
+ */
+ TaskTrackerStatus status = null;
+
StatusHttpServer server = null;
boolean shuttingDown = false;
@@ -249,6 +259,7 @@
this.mapTotal = 0;
this.reduceTotal = 0;
this.acceptNewTasks = true;
+ this.status = null;
this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
@@ -535,20 +546,27 @@
* @throws IOException
*/
private HeartbeatResponse transmitHeartBeat() throws IOException {
+ //
+ // Check if the last heartbeat got through...
+ // if so then build the heartbeat information for the JobTracker;
+ // else resend the previous status information.
//
- // Build the heartbeat information for the JobTracker
- //
- List<TaskStatus> taskReports =
- new ArrayList<TaskStatus>(runningTasks.size());
- synchronized (this) {
- for (TaskInProgress tip: runningTasks.values()) {
- taskReports.add(tip.createStatus());
+ if (status == null) {
+ List<TaskStatus> taskReports =
+ new ArrayList<TaskStatus>(runningTasks.size());
+ synchronized (this) {
+ for (TaskInProgress tip: runningTasks.values()) {
+ taskReports.add(tip.createStatus());
+ }
}
+ status =
+ new TaskTrackerStatus(taskTrackerName, localHostname,
+ httpPort, taskReports,
+ failures);
+ } else {
+ LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+ "' with reponseId '" + heartbeatResponseId);
}
- TaskTrackerStatus status =
- new TaskTrackerStatus(taskTrackerName, localHostname,
- httpPort, taskReports,
- failures);
//
// Check if we should ask for a new Task
@@ -569,10 +587,14 @@
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
+
+ //
+ // The heartbeat got through successfully!
+ //
heartbeatResponseId = heartbeatResponse.getResponseId();
synchronized (this) {
- for (TaskStatus taskStatus : taskReports) {
+ for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
if (taskStatus.getIsMap()) {
mapTotal--;
@@ -584,6 +606,10 @@
}
}
}
+
+ // Force a rebuild of 'status' on the next iteration
+ status = null;
+
return heartbeatResponse;
}
Modified: lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=494158&r1=494157&r2=494158
==============================================================================
--- lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/trunk/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Mon Jan 8 11:24:08 2007
@@ -98,11 +98,24 @@
* All current tasks at the TaskTracker.
*
* Tasks are tracked by a TaskStatus object.
+ *
+ * @deprecated use {@link #getTaskReports()} instead
*/
public Iterator taskReports() {
return taskReports.iterator();
}
+ /**
+ * Get the current tasks at the TaskTracker.
+ * Tasks are tracked by a {@link TaskStatus} object.
+ *
+ * @return a list of {@link TaskStatus} representing
+ * the current tasks at the TaskTracker.
+ */
+ public List<TaskStatus> getTaskReports() {
+ return taskReports;
+ }
+
/**
* Return the current MapTask count
*/