You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by cu...@apache.org on 2007/01/11 00:59:26 UTC
svn commit: r495049 [2/4] - in /lucene/hadoop/branches/branch-0.10: ./ bin/
docs/ src/docs/src/documentation/content/xdocs/
src/java/org/apache/hadoop/fs/ src/java/org/apache/hadoop/fs/s3/
src/java/org/apache/hadoop/io/compress/ src/java/org/apache/had...
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobInProgress.java Wed Jan 10 15:59:23 2007
@@ -454,40 +454,52 @@
TaskStatus status,
JobTrackerMetrics metrics) {
String taskid = status.getTaskId();
+
+ // Sanity check: is the TIP already complete?
if (tip.isComplete()) {
LOG.info("Already complete TIP " + tip.getTIPId() +
- " has completed task " + taskid);
- return;
- } else {
- LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
- " successfully.");
-
- String taskTrackerName = status.getTaskTracker();
+ " has completed task " + taskid);
+
+ // Just mark this 'task' as complete
+ tip.completedTask(taskid);
- if(status.getIsMap()){
- JobHistory.MapAttempt.logStarted(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getStartTime(),
- taskTrackerName);
- JobHistory.MapAttempt.logFinished(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getFinishTime(),
- taskTrackerName);
- JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
- Values.MAP.name(), status.getFinishTime());
- }else{
- JobHistory.ReduceAttempt.logStarted(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getStartTime(),
- taskTrackerName);
- JobHistory.ReduceAttempt.logFinished(profile.getJobId(),
- tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
- status.getSortFinishTime(), status.getFinishTime(),
- taskTrackerName);
- JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
- Values.REDUCE.name(), status.getFinishTime());
+ // Let the JobTracker cleanup this taskid if the job isn't running
+ if (this.status.getRunState() != JobStatus.RUNNING) {
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
}
+ return;
+ }
+
+ LOG.info("Task '" + taskid + "' has completed " + tip.getTIPId() +
+ " successfully.");
+
+ // Update jobhistory
+ String taskTrackerName = status.getTaskTracker();
+ if(status.getIsMap()){
+ JobHistory.MapAttempt.logStarted(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getStartTime(),
+ taskTrackerName);
+ JobHistory.MapAttempt.logFinished(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getFinishTime(),
+ taskTrackerName);
+ JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
+ Values.MAP.name(), status.getFinishTime());
+ }else{
+ JobHistory.ReduceAttempt.logStarted(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getStartTime(),
+ taskTrackerName);
+ JobHistory.ReduceAttempt.logFinished(profile.getJobId(),
+ tip.getTIPId(), status.getTaskId(), status.getShuffleFinishTime(),
+ status.getSortFinishTime(), status.getFinishTime(),
+ taskTrackerName);
+ JobHistory.Task.logFinished(profile.getJobId(), tip.getTIPId(),
+ Values.REDUCE.name(), status.getFinishTime());
}
+ // Mark the TIP as complete
tip.completed(taskid);
- // updating the running/finished map/reduce counts
+
+ // Update the running/finished map/reduce counts
if (tip.isMapTask()){
runningMapTasks -= 1;
finishedMapTasks += 1;
@@ -533,6 +545,10 @@
JobHistory.JobInfo.logFinished(this.status.getJobId(), finishTime,
this.finishedMapTasks, this.finishedReduceTasks, failedMapTasks, failedReduceTasks);
metrics.completeJob();
+ } else if (this.status.getRunState() != JobStatus.RUNNING) {
+ // The job has been killed/failed,
+ // JobTracker should cleanup this task
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
}
}
@@ -541,6 +557,7 @@
*/
public synchronized void kill() {
if (status.getRunState() != JobStatus.FAILED) {
+ LOG.info("Killing job '" + this.status.getJobId() + "'");
this.status = new JobStatus(status.getJobId(), 1.0f, 1.0f, JobStatus.FAILED);
this.finishTime = System.currentTimeMillis();
this.runningMapTasks = 0;
@@ -575,7 +592,9 @@
private void failedTask(TaskInProgress tip, String taskid,
TaskStatus status, String trackerName,
boolean wasRunning, boolean wasComplete) {
+ // Mark the taskid as a 'failure'
tip.failedSubTask(taskid, trackerName);
+
boolean isRunning = tip.isRunning();
boolean isComplete = tip.isComplete();
@@ -622,6 +641,11 @@
}
//
+ // Let the JobTracker know that this task has failed
+ //
+ jobtracker.markCompletedTaskAttempt(status.getTaskTracker(), taskid);
+
+ //
// Check if we need to kill the job because of too many failures
//
if (tip.isFailed()) {
@@ -633,9 +657,7 @@
System.currentTimeMillis(), this.finishedMapTasks, this.finishedReduceTasks);
kill();
}
-
- jobtracker.removeTaskEntry(taskid);
- }
+ }
/**
* Fail a task with a given reason, but without a status object.
@@ -669,6 +691,9 @@
* from the various tables.
*/
synchronized void garbageCollect() {
+ // Let the JobTracker know that a job is complete
+ jobtracker.finalizeJob(this);
+
try {
// Definitely remove the local-disk copy of the job file
if (localJobFile != null) {
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/JobTracker.java Wed Jan 10 15:59:23 2007
@@ -47,6 +47,12 @@
static final int MIN_CLUSTER_SIZE_FOR_PADDING = 3;
/**
+ * The maximum no. of 'completed' (successful/failed/killed)
+ * jobs kept in memory per-user.
+ */
+ static final int MAX_COMPLETE_USER_JOBS_IN_MEMORY = 100;
+
+ /**
* Used for formatting the id numbers
*/
private static NumberFormat idFormat = NumberFormat.getInstance();
@@ -215,36 +221,45 @@
//
// Loop through all expired items in the queue
//
- synchronized (taskTrackers) {
+ // Need to lock the JobTracker here since we are
+ // manipulating it's data-structures via
+ // ExpireTrackers.run -> JobTracker.lostTaskTracker ->
+ // JobInProgress.failedTask -> JobTracker.markCompleteTaskAttempt
+ // Also need to lock JobTracker before locking 'taskTracker' &
+ // 'trackerExpiryQueue' to prevent deadlock:
+ // @see {@link JobTracker.processHeartbeat(TaskTrackerStatus, boolean)}
+ synchronized (JobTracker.this) {
+ synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
- long now = System.currentTimeMillis();
- TaskTrackerStatus leastRecent = null;
- while ((trackerExpiryQueue.size() > 0) &&
- ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
- (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
-
- // Remove profile from head of queue
- trackerExpiryQueue.remove(leastRecent);
- String trackerName = leastRecent.getTrackerName();
-
- // Figure out if last-seen time should be updated, or if tracker is dead
- TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
- // Items might leave the taskTracker set through other means; the
- // status stored in 'taskTrackers' might be null, which means the
- // tracker has already been destroyed.
- if (newProfile != null) {
- if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
- // Remove completely
- updateTaskTrackerStatus(trackerName, null);
- lostTaskTracker(leastRecent.getTrackerName(),
- leastRecent.getHost());
- } else {
- // Update time by inserting latest profile
- trackerExpiryQueue.add(newProfile);
- }
- }
+ long now = System.currentTimeMillis();
+ TaskTrackerStatus leastRecent = null;
+ while ((trackerExpiryQueue.size() > 0) &&
+ ((leastRecent = (TaskTrackerStatus) trackerExpiryQueue.first()) != null) &&
+ (now - leastRecent.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL)) {
+
+ // Remove profile from head of queue
+ trackerExpiryQueue.remove(leastRecent);
+ String trackerName = leastRecent.getTrackerName();
+
+ // Figure out if last-seen time should be updated, or if tracker is dead
+ TaskTrackerStatus newProfile = (TaskTrackerStatus) taskTrackers.get(leastRecent.getTrackerName());
+ // Items might leave the taskTracker set through other means; the
+ // status stored in 'taskTrackers' might be null, which means the
+ // tracker has already been destroyed.
+ if (newProfile != null) {
+ if (now - newProfile.getLastSeen() > TASKTRACKER_EXPIRY_INTERVAL) {
+ // Remove completely
+ updateTaskTrackerStatus(trackerName, null);
+ lostTaskTracker(leastRecent.getTrackerName(),
+ leastRecent.getHost());
+ } else {
+ // Update time by inserting latest profile
+ trackerExpiryQueue.add(newProfile);
+ }
}
+ }
}
+ }
}
} catch (Exception t) {
LOG.error("Tracker Expiry Thread got exception: " +
@@ -289,10 +304,26 @@
if (job.getStatus().getRunState() != JobStatus.RUNNING &&
job.getStatus().getRunState() != JobStatus.PREP &&
(job.getFinishTime() + RETIRE_JOB_INTERVAL < System.currentTimeMillis())) {
+ // Ok, this call to removeTaskEntries
+ // is dangerous in some very very obscure
+ // cases; e.g. when job completed, exceeded
+ // RETIRE_JOB_INTERVAL time-limit and yet
+ // some task (taskid) wasn't complete!
+ removeJobTasks(job);
+
it.remove();
-
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(job.getProfile().getUser());
+ synchronized (userJobs) {
+ userJobs.remove(job);
+ }
+ }
jobInitQueue.remove(job);
jobsByArrival.remove(job);
+
+ LOG.info("Retired job with id: '" +
+ job.getProfile().getJobId() + "'");
}
}
}
@@ -418,6 +449,9 @@
TreeMap jobs = new TreeMap();
Vector jobsByArrival = new Vector();
+ // (user -> list of JobInProgress)
+ TreeMap<String, ArrayList<JobInProgress>> userToJobsMap = new TreeMap();
+
// All the known TaskInProgress items, mapped to by taskids (taskid->TIP)
Map<String, TaskInProgress> taskidToTIPMap = new TreeMap();
@@ -427,8 +461,12 @@
// (trackerID->TreeSet of taskids running at that tracker)
TreeMap trackerToTaskMap = new TreeMap();
- // (trackerID --> last sent HeartBeatResponseID)
- Map<String, Short> trackerToHeartbeatResponseIDMap = new TreeMap();
+ // (trackerID -> TreeSet of completed taskids running at that tracker)
+ TreeMap<String, Set<String>> trackerToMarkedTasksMap = new TreeMap();
+
+ // (trackerID --> last sent HeartBeatResponse)
+ Map<String, HeartbeatResponse> trackerToHeartbeatResponseMap =
+ new TreeMap();
//
// Watch and expire TaskTracker objects using these structures.
@@ -644,18 +682,181 @@
// taskid --> TIP
taskidToTIPMap.put(taskid, tip);
}
+
void removeTaskEntry(String taskid) {
// taskid --> tracker
String tracker = (String) taskidToTrackerMap.remove(taskid);
// tracker --> taskid
- TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
- if (trackerSet != null) {
- trackerSet.remove(taskid);
+ if (tracker != null) {
+ TreeSet trackerSet = (TreeSet) trackerToTaskMap.get(tracker);
+ if (trackerSet != null) {
+ trackerSet.remove(taskid);
+ }
}
// taskid --> TIP
taskidToTIPMap.remove(taskid);
+
+ LOG.debug("Removing task '" + taskid + "'");
+ }
+
+ /**
+ * Mark a 'task' for removal later.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker the tasktracker at which the 'task' was running
+ * @param taskid completed (success/failure/killed) task
+ */
+ void markCompletedTaskAttempt(String taskTracker, String taskid) {
+ // tracker --> taskid
+ TreeSet taskset = (TreeSet) trackerToMarkedTasksMap.get(taskTracker);
+ if (taskset == null) {
+ taskset = new TreeSet();
+ trackerToMarkedTasksMap.put(taskTracker, taskset);
+ }
+ taskset.add(taskid);
+
+ LOG.debug("Marked '" + taskid + "' from '" + taskTracker + "'");
+ }
+
+ /**
+ * Mark all 'non-running' jobs of the job for pruning.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param job the completed job
+ */
+ void markCompletedJob(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
+ markCompletedTaskAttempt(taskStatus.getTaskTracker(),
+ taskStatus.getTaskId());
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove all 'marked' tasks running on a given {@link TaskTracker}
+ * from the {@link JobTracker}'s data-structures.
+ * This function assumes that the JobTracker is locked on entry.
+ *
+ * @param taskTracker tasktracker whose 'non-running' tasks are to be purged
+ */
+ private void removeMarkedTasks(String taskTracker) {
+ // Purge all the 'marked' tasks which were running at taskTracker
+ TreeSet<String> markedTaskSet =
+ (TreeSet<String>) trackerToMarkedTasksMap.get(taskTracker);
+ if (markedTaskSet != null) {
+ for (String taskid : markedTaskSet) {
+ removeTaskEntry(taskid);
+ LOG.info("Removed completed task '" + taskid + "' from '" +
+ taskTracker + "'");
+ }
+ // Clear
+ trackerToMarkedTasksMap.remove(taskTracker);
+ }
+ }
+
+ /**
+ * Call {@link #removeTaskEntry(String)} for each of the
+ * job's tasks.
+ * When the JobTracker is retiring the long-completed
+ * job, either because it has outlived {@link #RETIRE_JOB_INTERVAL}
+ * or the limit of {@link #MAX_COMPLETE_USER_JOBS_IN_MEMORY} jobs
+ * has been reached, we can afford to nuke all it's tasks; a little
+ * unsafe, but practically feasible.
+ *
+ * @param job the job about to be 'retired'
+ */
+ synchronized private void removeJobTasks(JobInProgress job) {
+ for (TaskInProgress tip : job.getMapTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ for (TaskInProgress tip : job.getReduceTasks()) {
+ for (TaskStatus taskStatus : tip.getTaskStatuses()) {
+ removeTaskEntry(taskStatus.getTaskId());
+ }
+ }
+ }
+
+ /**
+ * Safe clean-up all data structures at the end of the
+ * job (success/failure/killed).
+ * Here we also ensure that for a given user we maintain
+ * information for only MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs
+ * on the JobTracker.
+ *
+ * @param job completed job.
+ */
+ synchronized void finalizeJob(JobInProgress job) {
+ // Mark the 'non-running' tasks for pruning
+ markCompletedJob(job);
+
+ // Purge oldest jobs and keep at-most MAX_COMPLETE_USER_JOBS_IN_MEMORY jobs of a given user
+ // in memory; information about the purged jobs is available via
+ // JobHistory.
+ synchronized (jobs) {
+ synchronized (jobsByArrival) {
+ synchronized (jobInitQueue) {
+ String jobUser = job.getProfile().getUser();
+ synchronized (userToJobsMap) {
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(jobUser);
+ synchronized (userJobs) {
+ while (userJobs.size() >
+ MAX_COMPLETE_USER_JOBS_IN_MEMORY) {
+ JobInProgress rjob = userJobs.get(0);
+
+ // Do not delete 'current'
+ // finished job just yet.
+ if (rjob == job) {
+ break;
+ }
+
+ // Cleanup all datastructures
+ int rjobRunState =
+ rjob.getStatus().getRunState();
+ if (rjobRunState == JobStatus.SUCCEEDED ||
+ rjobRunState == JobStatus.FAILED) {
+ // Ok, this call to removeTaskEntries
+ // is dangerous is some very very obscure
+ // cases; e.g. when rjob completed, hit
+ // MAX_COMPLETE_USER_JOBS_IN_MEMORY job
+ // limit and yet some task (taskid)
+ // wasn't complete!
+ removeJobTasks(rjob);
+
+ userJobs.remove(0);
+ jobs.remove(rjob.getProfile().getJobId());
+ jobInitQueue.remove(rjob);
+ jobsByArrival.remove(rjob);
+
+ LOG.info("Retired job with id: '" +
+ rjob.getProfile().getJobId() + "'");
+ } else {
+ // Do not remove jobs that aren't complete.
+ // Stop here, and let the next pass take
+ // care of purging jobs.
+ break;
+ }
+ }
+ }
+ }
+ }
+ }
+ }
}
///////////////////////////////////////////////////////
@@ -736,26 +937,46 @@
public synchronized HeartbeatResponse heartbeat(TaskTrackerStatus status,
boolean initialContact, boolean acceptNewTasks, short responseId)
throws IOException {
- LOG.debug("Got heartbeat from: " + status.getTrackerName() +
+ LOG.debug("Got heartbeat from: " + status.getTrackerName() +
" (initialContact: " + initialContact +
" acceptNewTasks: " + acceptNewTasks + ")" +
" with responseId: " + responseId);
// First check if the last heartbeat response got through
String trackerName = status.getTrackerName();
- Short oldResponseId = trackerToHeartbeatResponseIDMap.get(trackerName);
-
- short newResponseId = (short)(responseId + 1);
- if (!initialContact && oldResponseId != null &&
- oldResponseId.shortValue() != responseId) {
- newResponseId = oldResponseId.shortValue();
+ HeartbeatResponse prevHeartbeatResponse =
+ trackerToHeartbeatResponseMap.get(trackerName);
+
+ if (initialContact != true) {
+ // If this isn't the 'initial contact' from the tasktracker,
+ // there is something seriously wrong if the JobTracker has
+ // no record of the 'previous heartbeat'; if so, ask the
+ // tasktracker to re-initialize itself.
+ if (prevHeartbeatResponse == null) {
+ LOG.warn("Serious problem, cannot find record of 'previous' " +
+ "heartbeat for '" + trackerName +
+ "'; reinitializing the tasktracker");
+ return new HeartbeatResponse(responseId,
+ new TaskTrackerAction[] {new ReinitTrackerAction()});
+
+ }
+
+ // It is completely safe to ignore a 'duplicate' from a tracker
+ // since we are guaranteed that the tracker sends the same
+ // 'heartbeat' when rpcs are lost.
+ // {@see TaskTracker.transmitHeartbeat()}
+ if (prevHeartbeatResponse.getResponseId() != responseId) {
+ LOG.info("Ignoring 'duplicate' heartbeat from '" +
+ trackerName + "'");
+ return prevHeartbeatResponse;
+ }
}
// Process this heartbeat
- if (!processHeartbeat(status, initialContact,
- (newResponseId != responseId))) {
- if (oldResponseId != null) {
- trackerToHeartbeatResponseIDMap.remove(trackerName);
+ short newResponseId = (short)(responseId + 1);
+ if (!processHeartbeat(status, initialContact)) {
+ if (prevHeartbeatResponse != null) {
+ trackerToHeartbeatResponseMap.remove(trackerName);
}
return new HeartbeatResponse(newResponseId,
@@ -784,12 +1005,12 @@
response.setActions(
actions.toArray(new TaskTrackerAction[actions.size()]));
- // Update the trackerToHeartbeatResponseIDMap
- if (newResponseId != responseId) {
- trackerToHeartbeatResponseIDMap.put(trackerName,
- new Short(newResponseId));
- }
+ // Update the trackerToHeartbeatResponseMap
+ trackerToHeartbeatResponseMap.put(trackerName, response);
+ // Done processing the hearbeat, now remove 'marked' tasks
+ removeMarkedTasks(trackerName);
+
return response;
}
@@ -824,12 +1045,9 @@
* Process incoming heartbeat messages from the task trackers.
*/
private synchronized boolean processHeartbeat(
- TaskTrackerStatus trackerStatus,
- boolean initialContact, boolean updateStatusTimestamp) {
+ TaskTrackerStatus trackerStatus, boolean initialContact) {
String trackerName = trackerStatus.getTrackerName();
- if (initialContact || updateStatusTimestamp) {
- trackerStatus.setLastSeen(System.currentTimeMillis());
- }
+ trackerStatus.setLastSeen(System.currentTimeMillis());
synchronized (taskTrackers) {
synchronized (trackerExpiryQueue) {
@@ -857,7 +1075,7 @@
}
updateTaskStatuses(trackerStatus);
- //LOG.info("Got heartbeat from "+trackerName);
+
return true;
}
@@ -1028,7 +1246,6 @@
killList.add(new KillTaskAction(killTaskId));
LOG.debug(taskTracker + " -> KillTaskAction: " + killTaskId);
} else {
- //killTasksList.add(new KillJobAction(taskId));
String killJobId = tip.getJob().getStatus().getJobId();
killJobIds.add(killJobId);
}
@@ -1051,14 +1268,28 @@
* map task outputs.
*/
public synchronized MapOutputLocation[]
- locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce) {
- ArrayList result = new ArrayList(mapTasksNeeded.length);
+ locateMapOutputs(String jobId, int[] mapTasksNeeded, int reduce)
+ throws IOException {
+ // Check to make sure that the job hasn't 'completed'.
JobInProgress job = getJob(jobId);
+ if (job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
+ ArrayList result = new ArrayList(mapTasksNeeded.length);
for (int i = 0; i < mapTasksNeeded.length; i++) {
TaskStatus status = job.findFinishedMap(mapTasksNeeded[i]);
if (status != null) {
String trackerId =
(String) taskidToTrackerMap.get(status.getTaskId());
+ // Safety check, if we can't find the taskid in
+ // taskidToTrackerMap and job isn't 'running', then just
+ // return an empty array
+ if (trackerId == null &&
+ job.status.getRunState() != JobStatus.RUNNING) {
+ return new MapOutputLocation[0];
+ }
+
TaskTrackerStatus tracker;
synchronized (taskTrackers) {
tracker = (TaskTrackerStatus) taskTrackers.get(trackerId);
@@ -1108,10 +1339,22 @@
synchronized (jobs) {
synchronized (jobsByArrival) {
synchronized (jobInitQueue) {
- jobs.put(job.getProfile().getJobId(), job);
- jobsByArrival.add(job);
- jobInitQueue.add(job);
- jobInitQueue.notifyAll();
+ synchronized (userToJobsMap) {
+ jobs.put(job.getProfile().getJobId(), job);
+ String jobUser = job.getProfile().getUser();
+ if (!userToJobsMap.containsKey(jobUser)) {
+ userToJobsMap.put(jobUser,
+ new ArrayList<JobInProgress>());
+ }
+ ArrayList<JobInProgress> userJobs =
+ userToJobsMap.get(jobUser);
+ synchronized (userJobs) {
+ userJobs.add(job);
+ }
+ jobsByArrival.add(job);
+ jobInitQueue.add(job);
+ jobInitQueue.notifyAll();
+ }
}
}
}
@@ -1271,8 +1514,7 @@
* jobs that might be affected.
*/
void updateTaskStatuses(TaskTrackerStatus status) {
- for (Iterator it = status.taskReports(); it.hasNext(); ) {
- TaskStatus report = (TaskStatus) it.next();
+ for (TaskStatus report : status.getTaskReports()) {
report.setTaskTracker(status.getTrackerName());
String taskId = report.getTaskId();
TaskInProgress tip = (TaskInProgress) taskidToTIPMap.get(taskId);
@@ -1310,8 +1552,16 @@
TaskStatus.Phase.MAP, hostname, trackerName,
myMetrics);
}
+ } else if (!tip.isMapTask() && tip.isComplete()) {
+ // Completed 'reduce' task, not failed;
+ // only removed from data-structures.
+ markCompletedTaskAttempt(trackerName, taskId);
}
}
+
+ // Purge 'marked' tasks, needs to be done
+ // here to prevent hanging references!
+ removeMarkedTasks(trackerName);
}
}
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/MapTask.java Wed Jan 10 15:59:23 2007
@@ -202,7 +202,7 @@
//spawn a thread to give merge progress heartbeats
Thread sortProgress = new Thread() {
public void run() {
- LOG.info("Started thread: " + getName());
+ LOG.debug("Started thread: " + getName());
while (true) {
try {
reportProgress(umbilical);
@@ -467,26 +467,24 @@
{
Path [] filename = new Path[numSpills];
Path [] indexFileName = new Path[numSpills];
- FSDataInputStream in[] = new FSDataInputStream[numSpills];
- FSDataInputStream indexIn[] = new FSDataInputStream[numSpills];
for(int i = 0; i < numSpills; i++) {
filename[i] = mapOutputFile.getSpillFile(getTaskId(), i);
- in[i] = localFs.open(filename[i]);
indexFileName[i] = mapOutputFile.getSpillIndexFile(getTaskId(), i);
- indexIn[i] = localFs.open(indexFileName[i]);
}
//create a sorter object as we need access to the SegmentDescriptor
//class and merge methods
Sorter sorter = new Sorter(localFs, keyClass, valClass, job);
- sorter.setFactor(numSpills);
for (int parts = 0; parts < partitions; parts++){
List<SegmentDescriptor> segmentList = new ArrayList(numSpills);
for(int i = 0; i < numSpills; i++) {
- long segmentOffset = indexIn[i].readLong();
- long segmentLength = indexIn[i].readLong();
+ FSDataInputStream indexIn = localFs.open(indexFileName[i]);
+ indexIn.seek(parts * 16);
+ long segmentOffset = indexIn.readLong();
+ long segmentLength = indexIn.readLong();
+ indexIn.close();
SegmentDescriptor s = sorter.new SegmentDescriptor(segmentOffset,
segmentLength, filename[i]);
s.preserveInput(true);
@@ -513,8 +511,8 @@
finalIndexOut.close();
//cleanup
for(int i = 0; i < numSpills; i++) {
- in[i].close(); localFs.delete(filename[i]);
- indexIn[i].close(); localFs.delete(indexFileName[i]);
+ localFs.delete(filename[i]);
+ localFs.delete(indexFileName[i]);
}
}
}
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskInProgress.java Wed Jan 10 15:59:23 2007
@@ -57,7 +57,6 @@
private int partition;
private JobTracker jobtracker;
private String id;
- private String totalTaskIds[];
private JobInProgress job;
// Status of the TIP
@@ -70,7 +69,13 @@
private int completes = 0;
private boolean failed = false;
private boolean killed = false;
- private TreeSet usableTaskIds = new TreeSet();
+
+ // The 'unique' prefix for taskids of this tip
+ String taskIdPrefix;
+
+ // The 'next' usable taskid of this tip
+ int nextTaskId = 0;
+
// Map from task Id -> TaskTracker Id, contains tasks that are
// currently runnings
private TreeMap<String, String> activeTasks = new TreeMap();
@@ -139,13 +144,8 @@
void init(String jobUniqueString) {
this.startTime = System.currentTimeMillis();
this.runSpeculative = conf.getSpeculativeExecution();
- String uniqueString = makeUniqueString(jobUniqueString);
- this.id = "tip_" + uniqueString;
- this.totalTaskIds = new String[MAX_TASK_EXECS + MAX_TASK_FAILURES];
- for (int i = 0; i < totalTaskIds.length; i++) {
- totalTaskIds[i] = "task_" + uniqueString + "_" + i;
- usableTaskIds.add(totalTaskIds[i]);
- }
+ this.taskIdPrefix = makeUniqueString(jobUniqueString);
+ this.id = "tip_" + this.taskIdPrefix;
}
////////////////////////////////////
@@ -180,11 +180,19 @@
}
/**
+ * Is this tip complete?
+ *
+ * @return <code>true</code> if the tip is complete, else <code>false</code>
*/
public boolean isComplete() {
return (completes > 0);
}
+
/**
+ * Is the given taskid in this tip complete?
+ *
+ * @param taskid taskid of attempt to check for completion
+ * @return <code>true</code> if taskid is complete, else <code>false</code>
*/
public boolean isComplete(String taskid) {
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
@@ -194,7 +202,11 @@
return ((completes > 0) &&
(status.getRunState() == TaskStatus.State.SUCCEEDED));
}
+
/**
+ * Is the tip a failure?
+ *
+ * @return <code>true</code> if tip has failed, else <code>false</code>
*/
public boolean isFailed() {
return failed;
@@ -293,6 +305,17 @@
TaskStatus.State oldState = oldStatus.getRunState();
TaskStatus.State newState = status.getRunState();
+ // We should never recieve a duplicate success/failure/killed
+ // status update for the same taskid! This is a safety check,
+ // and is addressed better at the TaskTracker to ensure this.
+ // @see {@link TaskTracker.transmitHeartbeat()}
+ if ((newState != TaskStatus.State.RUNNING) &&
+ (oldState == newState)) {
+ LOG.warn("Recieved duplicate status update of '" + newState +
+ "' for '" + taskid + "' of TIP '" + getTIPId() + "'");
+ return false;
+ }
+
// The task is not allowed to move from completed back to running.
// We have seen out of order status messagesmoving tasks from complete
// to running. This is a spot fix, but it should be addressed more
@@ -346,14 +369,29 @@
/**
* Indicate that one of the taskids in this TaskInProgress
- * has successfully completed!
+ * has successfully completed.
+ *
+ * However this may not be the first subtask in this
+ * TaskInProgress to be completed and hence we might not want to
+ * manipulate the TaskInProgress to note that it is 'complete' just-as-yet.
*/
- public void completed(String taskid) {
+ void completedTask(String taskid) {
LOG.info("Task '" + taskid + "' has completed.");
TaskStatus status = (TaskStatus) taskStatuses.get(taskid);
status.setRunState(TaskStatus.State.SUCCEEDED);
activeTasks.remove(taskid);
-
+ }
+
+ /**
+ * Indicate that one of the taskids in this TaskInProgress
+ * has successfully completed!
+ */
+ public void completed(String taskid) {
+ //
+ // Record that this taskid is complete
+ //
+ completedTask(taskid);
+
//
// Now that the TIP is complete, the other speculative
// subtasks will be closed when the owning tasktracker
@@ -470,8 +508,17 @@
execStartTime = System.currentTimeMillis();
}
- String taskid = (String) usableTaskIds.first();
- usableTaskIds.remove(taskid);
+ // Create the 'taskid'
+ String taskid = null;
+ if (nextTaskId < (MAX_TASK_EXECS + MAX_TASK_FAILURES)) {
+ taskid = new String("task_" + taskIdPrefix + "_" + nextTaskId);
+ ++nextTaskId;
+ } else {
+ LOG.warn("Exceeded limit of " + (MAX_TASK_EXECS + MAX_TASK_FAILURES) +
+ " attempts for the tip '" + getTIPId() + "'");
+ return null;
+ }
+
String jobId = job.getProfile().getJobId();
if (isMapTask()) {
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskRunner.java Wed Jan 10 15:59:23 2007
@@ -204,17 +204,18 @@
// Add classpath.
vargs.add("-classpath");
vargs.add(classPath.toString());
- // Add main class and its arguments
- vargs.add(TaskTracker.Child.class.getName()); // main of Child
- vargs.add(tracker.taskReportPort + ""); // pass umbilical port
- vargs.add(t.getTaskId()); // pass task identifier
-
+
// Add java.library.path; necessary for native-hadoop libraries
String libraryPath = System.getProperty("java.library.path");
if (libraryPath != null) {
vargs.add("-Djava.library.path=" + libraryPath);
}
-
+
+ // Add main class and its arguments
+ vargs.add(TaskTracker.Child.class.getName()); // main of Child
+ vargs.add(tracker.taskReportPort + ""); // pass umbilical port
+ vargs.add(t.getTaskId()); // pass task identifier
+
// Run java
runChild((String[])vargs.toArray(new String[0]), workDir);
} catch (FSError e) {
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTracker.java Wed Jan 10 15:59:23 2007
@@ -74,6 +74,16 @@
// last heartbeat response recieved
short heartbeatResponseId = -1;
+ /*
+ * This is the last 'status' report sent by this tracker to the JobTracker.
+ *
+ * If the rpc call succeeds, this 'status' is cleared-out by this tracker;
+ * indicating that a 'fresh' status report be generated; in the event the
+ * rpc calls fails for whatever reason, the previous status report is sent
+ * again.
+ */
+ TaskTrackerStatus status = null;
+
StatusHttpServer server = null;
boolean shuttingDown = false;
@@ -249,6 +259,7 @@
this.mapTotal = 0;
this.reduceTotal = 0;
this.acceptNewTasks = true;
+ this.status = null;
this.minSpaceStart = this.fConf.getLong("mapred.local.dir.minspacestart", 0L);
this.minSpaceKill = this.fConf.getLong("mapred.local.dir.minspacekill", 0L);
@@ -535,20 +546,27 @@
* @throws IOException
*/
private HeartbeatResponse transmitHeartBeat() throws IOException {
+ //
+ // Check if the last heartbeat got through...
+ // if so then build the heartbeat information for the JobTracker;
+ // else resend the previous status information.
//
- // Build the heartbeat information for the JobTracker
- //
- List<TaskStatus> taskReports =
- new ArrayList<TaskStatus>(runningTasks.size());
- synchronized (this) {
- for (TaskInProgress tip: runningTasks.values()) {
- taskReports.add(tip.createStatus());
+ if (status == null) {
+ List<TaskStatus> taskReports =
+ new ArrayList<TaskStatus>(runningTasks.size());
+ synchronized (this) {
+ for (TaskInProgress tip: runningTasks.values()) {
+ taskReports.add(tip.createStatus());
+ }
}
+ status =
+ new TaskTrackerStatus(taskTrackerName, localHostname,
+ httpPort, taskReports,
+ failures);
+ } else {
+ LOG.info("Resending 'status' to '" + jobTrackAddr.getHostName() +
+ "' with reponseId '" + heartbeatResponseId);
}
- TaskTrackerStatus status =
- new TaskTrackerStatus(taskTrackerName, localHostname,
- httpPort, taskReports,
- failures);
//
// Check if we should ask for a new Task
@@ -569,10 +587,14 @@
HeartbeatResponse heartbeatResponse = jobClient.heartbeat(status,
justStarted, askForNewTask,
heartbeatResponseId);
+
+ //
+ // The heartbeat got through successfully!
+ //
heartbeatResponseId = heartbeatResponse.getResponseId();
synchronized (this) {
- for (TaskStatus taskStatus : taskReports) {
+ for (TaskStatus taskStatus : status.getTaskReports()) {
if (taskStatus.getRunState() != TaskStatus.State.RUNNING) {
if (taskStatus.getIsMap()) {
mapTotal--;
@@ -584,6 +606,10 @@
}
}
}
+
+ // Force a rebuild of 'status' on the next iteration
+ status = null;
+
return heartbeatResponse;
}
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/mapred/TaskTrackerStatus.java Wed Jan 10 15:59:23 2007
@@ -98,11 +98,24 @@
* All current tasks at the TaskTracker.
*
* Tasks are tracked by a TaskStatus object.
+ *
+ * @deprecated use {@link #getTaskReports()} instead
*/
public Iterator taskReports() {
return taskReports.iterator();
}
+ /**
+ * Get the current tasks at the TaskTracker.
+ * Tasks are tracked by a {@link TaskStatus} object.
+ *
+ * @return a list of {@link TaskStatus} representing
+ * the current tasks at the TaskTracker.
+ */
+ public List<TaskStatus> getTaskReports() {
+ return taskReports;
+ }
+
/**
* Return the current MapTask count
*/
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/NativeCodeLoader.java Wed Jan 10 15:59:23 2007
@@ -44,6 +44,7 @@
} catch (Throwable t) {
// Ignore failure to load
LOG.debug("Failed to load native-hadoop with error: " + t);
+ LOG.debug("java.library.path=" + System.getProperty("java.library.path"));
}
if (!nativeCodeLoaded) {
Modified: lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java (original)
+++ lucene/hadoop/branches/branch-0.10/src/java/org/apache/hadoop/util/RunJar.java Wed Jan 10 15:59:23 2007
@@ -106,14 +106,18 @@
}
mainClassName = mainClassName.replaceAll("/", ".");
- final File workDir = File.createTempFile("hadoop-unjar","",
- new File( new Configuration().get("hadoop.tmp.dir")) );
+ File tmpDir = new File(new Configuration().get("hadoop.tmp.dir"));
+ tmpDir.mkdirs();
+ if (!tmpDir.isDirectory()) {
+ System.err.println("Mkdirs failed to create " + tmpDir);
+ System.exit(-1);
+ }
+ final File workDir = File.createTempFile("hadoop-unjar", "", tmpDir );
workDir.delete();
- if (!workDir.mkdirs()) {
- if (!workDir.isDirectory()) {
- System.err.println("Mkdirs failed to create " + workDir.toString());
- System.exit(-1);
- }
+ workDir.mkdirs();
+ if (!workDir.isDirectory()) {
+ System.err.println("Mkdirs failed to create " + workDir);
+ System.exit(-1);
}
Runtime.getRuntime().addShutdownHook(new Thread() {
Modified: lucene/hadoop/branches/branch-0.10/src/native/Makefile.am
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/Makefile.am?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/native/Makefile.am (original)
+++ lucene/hadoop/branches/branch-0.10/src/native/Makefile.am Wed Jan 10 15:59:23 2007
@@ -36,7 +36,7 @@
export PLATFORM = $(shell echo $$OS_NAME | tr [A-Z] [a-z])
# List the sub-directories here
-SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib
+SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib
# The following export is needed to build libhadoop.so in the 'lib' directory
export SUBDIRS
Modified: lucene/hadoop/branches/branch-0.10/src/native/Makefile.in
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/Makefile.in?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/native/Makefile.in (original)
+++ lucene/hadoop/branches/branch-0.10/src/native/Makefile.in Wed Jan 10 15:59:23 2007
@@ -207,7 +207,7 @@
target_alias = @target_alias@
# List the sub-directories here
-SUBDIRS = src/org/apache/hadoop/io/compress/zlib lib
+SUBDIRS = src/org/apache/hadoop/io/compress/zlib src/org/apache/hadoop/io/compress/lzo lib
all: config.h
$(MAKE) $(AM_MAKEFLAGS) all-recursive
Modified: lucene/hadoop/branches/branch-0.10/src/native/NEWS
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/NEWS?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/native/NEWS (original)
+++ lucene/hadoop/branches/branch-0.10/src/native/NEWS Wed Jan 10 15:59:23 2007
@@ -1,3 +1,5 @@
2006-10-05 Arun C Murthy <ar...@yahoo-inc.com>
* Initial version of libhadoop released
+2007-01-03 Arun C Murthy <ar...@yahoo-inc.com>
+ * Added support for lzo compression library
Modified: lucene/hadoop/branches/branch-0.10/src/native/config.h.in
URL: http://svn.apache.org/viewvc/lucene/hadoop/branches/branch-0.10/src/native/config.h.in?view=diff&rev=495049&r1=495048&r2=495049
==============================================================================
--- lucene/hadoop/branches/branch-0.10/src/native/config.h.in (original)
+++ lucene/hadoop/branches/branch-0.10/src/native/config.h.in Wed Jan 10 15:59:23 2007
@@ -1,5 +1,8 @@
/* config.h.in. Generated from configure.ac by autoheader. */
+/* The 'actual' dynamic-library for '-llzo2' */
+#undef HADOOP_LZO_LIBRARY
+
/* The 'actual' dynamic-library for '-lz' */
#undef HADOOP_ZLIB_LIBRARY
@@ -18,8 +21,41 @@
/* Define to 1 if you have the `jvm' library (-ljvm). */
#undef HAVE_LIBJVM
+/* Define to 1 if you have the `lzo2' library (-llzo2). */
+#undef HAVE_LIBLZO2
+
/* Define to 1 if you have the `z' library (-lz). */
#undef HAVE_LIBZ
+
+/* Define to 1 if you have the <lzo/lzo1a.h> header file. */
+#undef HAVE_LZO_LZO1A_H
+
+/* Define to 1 if you have the <lzo/lzo1b.h> header file. */
+#undef HAVE_LZO_LZO1B_H
+
+/* Define to 1 if you have the <lzo/lzo1c.h> header file. */
+#undef HAVE_LZO_LZO1C_H
+
+/* Define to 1 if you have the <lzo/lzo1f.h> header file. */
+#undef HAVE_LZO_LZO1F_H
+
+/* Define to 1 if you have the <lzo/lzo1x.h> header file. */
+#undef HAVE_LZO_LZO1X_H
+
+/* Define to 1 if you have the <lzo/lzo1y.h> header file. */
+#undef HAVE_LZO_LZO1Y_H
+
+/* Define to 1 if you have the <lzo/lzo1z.h> header file. */
+#undef HAVE_LZO_LZO1Z_H
+
+/* Define to 1 if you have the <lzo/lzo1.h> header file. */
+#undef HAVE_LZO_LZO1_H
+
+/* Define to 1 if you have the <lzo/lzo2a.h> header file. */
+#undef HAVE_LZO_LZO2A_H
+
+/* Define to 1 if you have the <lzo/lzo_asm.h> header file. */
+#undef HAVE_LZO_LZO_ASM_H
/* Define to 1 if you have the <memory.h> header file. */
#undef HAVE_MEMORY_H