You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/07 14:12:02 UTC
svn commit: r762732 - in /hadoop/core/branches/branch-0.20: CHANGES.txt
src/mapred/org/apache/hadoop/mapred/JobInProgress.java
src/mapred/org/apache/hadoop/mapred/JobTracker.java
Author: sharad
Date: Tue Apr 7 12:12:02 2009
New Revision: 762732
URL: http://svn.apache.org/viewvc?rev=762732&view=rev
Log:
HADOOP-5548. Add synchronization for JobTracker methods in RecoveryManager. Contributed by Amareshwari Sriramadasu.
Modified:
hadoop/core/branches/branch-0.20/CHANGES.txt
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Apr 7 12:12:02 2009
@@ -848,6 +848,9 @@
HADOOP-5305. Increase number of files and print debug messages in
TestCopyFiles. (szetszwo)
+ HADOOP-5548. Add synchronization for JobTracker methods in RecoveryManager.
+ (Amareshwari Sriramadasu via sharad)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Apr 7 12:12:02 2009
@@ -736,6 +736,10 @@
////////////////////////////////////////////////////
// Status update methods
////////////////////////////////////////////////////
+
+ /**
+ * Assuming {@link JobTracker} is locked on entry.
+ */
public synchronized void updateTaskStatus(TaskInProgress tip,
TaskStatus status) {
@@ -1258,6 +1262,9 @@
/**
* Populate the data structures as a task is scheduled.
+ *
+ * Assuming {@link JobTracker} is locked on entry.
+ *
* @param tip The tip for which the task is added
* @param id The attempt-id for the task
* @param tts task-tracker status
@@ -2421,6 +2428,9 @@
/**
* Fail a task with a given reason, but without a status object.
+ *
+ * Assuming {@link JobTracker} is locked on entry.
+ *
* @param tip The task's tip
* @param taskid The task id
* @param reason The reason that the task failed
Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Apr 7 12:12:02 2009
@@ -824,8 +824,10 @@
// Apply the final (job-level) updates
JobStatusChangeEvent event = updateJob(jip, job);
- // Update the job listeners
- updateJobInProgressListeners(event);
+ synchronized (JobTracker.this) {
+ // Update the job listeners
+ updateJobInProgressListeners(event);
+ }
}
}
@@ -943,10 +945,12 @@
// This means that the this is a FAILED events
TaskAttemptID id = TaskAttemptID.forName(cause);
TaskStatus status = tip.getTaskStatus(id);
- // This will add the tip failed event in the new log
- tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
- status.getPhase(), status.getRunState(),
- status.getTaskTracker());
+ synchronized (JobTracker.this) {
+ // This will add the tip failed event in the new log
+ tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(),
+ status.getPhase(), status.getRunState(),
+ status.getTaskTracker());
+ }
}
}
@@ -996,23 +1000,30 @@
0 , 0, 0);
ttStatus.setLastSeen(System.currentTimeMillis());
- // IV. Register a new tracker
- boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
- if (!isTrackerRegistered) {
- markTracker(trackerName); // add the tracker to recovery-manager
- addNewTracker(ttStatus);
- }
-
- // V. Update the tracker status
- // This will update the meta info of the jobtracker and also add the
- // tracker status if missing i.e register it
- updateTaskTrackerStatus(trackerName, ttStatus);
-
- // VI. Register the attempt
- // a) In the job
- job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
- // b) In the tip
- tip.updateStatus(taskStatus);
+ synchronized (JobTracker.this) {
+ synchronized (taskTrackers) {
+ synchronized (trackerExpiryQueue) {
+ // IV. Register a new tracker
+ boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+ if (!isTrackerRegistered) {
+ markTracker(trackerName); // add the tracker to recovery-manager
+ addNewTracker(ttStatus);
+ }
+
+ // V. Update the tracker status
+ // This will update the meta info of the jobtracker and also add the
+ // tracker status if missing i.e register it
+ updateTaskTrackerStatus(trackerName, ttStatus);
+ }
+ }
+ // Register the attempt with job and tip, under JobTracker lock.
+ // Since, as of today they are atomic through heartbeat.
+ // VI. Register the attempt
+ // a) In the job
+ job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
+ // b) In the tip
+ tip.updateStatus(taskStatus);
+ }
// VII. Make an entry in the launched tasks
expireLaunchingTasks.addNewTask(attemptId);
@@ -1060,8 +1071,10 @@
}
taskStatus.setCounters(counter);
- // II. Replay the status
- job.updateTaskStatus(tip, taskStatus);
+ synchronized (JobTracker.this) {
+ // II. Replay the status
+ job.updateTaskStatus(tip, taskStatus);
+ }
// III. Prevent the task from expiry
expireLaunchingTasks.removeTask(attemptId);
@@ -1097,8 +1110,10 @@
String diagInfo = attempt.get(Keys.ERROR);
taskStatus.setDiagnosticInfo(diagInfo); // diag info
- // II. Update the task status
- job.updateTaskStatus(tip, taskStatus);
+ synchronized (JobTracker.this) {
+ // II. Update the task status
+ job.updateTaskStatus(tip, taskStatus);
+ }
// III. Prevent the task from expiry
expireLaunchingTasks.removeTask(attemptId);
@@ -1221,22 +1236,24 @@
hasRecovered = true;
// III. Finalize the recovery
- // Make sure that the tracker statuses in the expiry-tracker queue
- // are updated
- long now = System.currentTimeMillis();
- int size = trackerExpiryQueue.size();
- for (int i = 0; i < size ; ++i) {
- // Get the first status
- TaskTrackerStatus status = trackerExpiryQueue.first();
+ synchronized (trackerExpiryQueue) {
+ // Make sure that the tracker statuses in the expiry-tracker queue
+ // are updated
+ long now = System.currentTimeMillis();
+ int size = trackerExpiryQueue.size();
+ for (int i = 0; i < size ; ++i) {
+ // Get the first status
+ TaskTrackerStatus status = trackerExpiryQueue.first();
- // Remove it
- trackerExpiryQueue.remove(status);
+ // Remove it
+ trackerExpiryQueue.remove(status);
- // Set the new time
- status.setLastSeen(now);
+ // Set the new time
+ status.setLastSeen(now);
- // Add back to get the sorted list
- trackerExpiryQueue.add(status);
+ // Add back to get the sorted list
+ trackerExpiryQueue.add(status);
+ }
}
LOG.info("Restoration complete");
@@ -2181,8 +2198,10 @@
/**
* Adds a new node to the jobtracker. It involves adding it to the expiry
* thread and adding it for resolution
+ *
+ * Assuming trackerExpiryQueue is locked on entry
+ *
* @param status Task Tracker's status
- * @param resolveInline Should the resolution happen inline?
*/
private void addNewTracker(TaskTrackerStatus status) {
trackerExpiryQueue.add(status);
@@ -2266,6 +2285,7 @@
}
// Update the listeners about the job
+ // Assuming JobTracker is locked on entry.
private void updateJobInProgressListeners(JobChangeEvent event) {
for (JobInProgressListener listener : jobInProgressListeners) {
listener.jobUpdated(event);