You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sh...@apache.org on 2009/04/07 14:12:02 UTC

svn commit: r762732 - in /hadoop/core/branches/branch-0.20: CHANGES.txt src/mapred/org/apache/hadoop/mapred/JobInProgress.java src/mapred/org/apache/hadoop/mapred/JobTracker.java

Author: sharad
Date: Tue Apr  7 12:12:02 2009
New Revision: 762732

URL: http://svn.apache.org/viewvc?rev=762732&view=rev
Log:
HADOOP-5548. Add synchronization for JobTracker methods in RecoveryManager. Contributed by Amareshwari Sriramadasu.

Modified:
    hadoop/core/branches/branch-0.20/CHANGES.txt
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue Apr  7 12:12:02 2009
@@ -848,6 +848,9 @@
     HADOOP-5305. Increase number of files and print debug messages in
     TestCopyFiles.  (szetszwo)
 
+    HADOOP-5548. Add synchronization for JobTracker methods in RecoveryManager.
+    (Amareshwari Sriramadasu via sharad)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Tue Apr  7 12:12:02 2009
@@ -736,6 +736,10 @@
   ////////////////////////////////////////////////////
   // Status update methods
   ////////////////////////////////////////////////////
+
+  /**
+   * Assuming {@link JobTracker} is locked on entry.
+   */
   public synchronized void updateTaskStatus(TaskInProgress tip, 
                                             TaskStatus status) {
 
@@ -1258,6 +1262,9 @@
 
   /**
    * Populate the data structures as a task is scheduled.
+   * 
+   * Assuming {@link JobTracker} is locked on entry.
+   * 
    * @param tip The tip for which the task is added
    * @param id The attempt-id for the task
    * @param tts task-tracker status
@@ -2421,6 +2428,9 @@
 
   /**
    * Fail a task with a given reason, but without a status object.
+   * 
+   * Assuming {@link JobTracker} is locked on entry.
+   * 
    * @param tip The task's tip
    * @param taskid The task id
    * @param reason The reason that the task failed

Modified: hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=762732&r1=762731&r2=762732&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/core/branches/branch-0.20/src/mapred/org/apache/hadoop/mapred/JobTracker.java Tue Apr  7 12:12:02 2009
@@ -824,8 +824,10 @@
           // Apply the final (job-level) updates
           JobStatusChangeEvent event = updateJob(jip, job);
           
-          // Update the job listeners
-          updateJobInProgressListeners(event);
+          synchronized (JobTracker.this) {
+            // Update the job listeners
+            updateJobInProgressListeners(event);
+          }
         }
       }
       
@@ -943,10 +945,12 @@
         // This means that the this is a FAILED events
         TaskAttemptID id = TaskAttemptID.forName(cause);
         TaskStatus status = tip.getTaskStatus(id);
-        // This will add the tip failed event in the new log
-        tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
-                                status.getPhase(), status.getRunState(), 
-                                status.getTaskTracker());
+        synchronized (JobTracker.this) {
+          // This will add the tip failed event in the new log
+          tip.getJob().failedTask(tip, id, status.getDiagnosticInfo(), 
+                                  status.getPhase(), status.getRunState(), 
+                                  status.getTaskTracker());
+        }
       }
     }
     
@@ -996,23 +1000,30 @@
                               0 , 0, 0);
       ttStatus.setLastSeen(System.currentTimeMillis());
 
-      // IV. Register a new tracker
-      boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
-      if (!isTrackerRegistered) {
-        markTracker(trackerName); // add the tracker to recovery-manager
-        addNewTracker(ttStatus);
-      }
-      
-      // V. Update the tracker status
-      //    This will update the meta info of the jobtracker and also add the
-      //    tracker status if missing i.e register it
-      updateTaskTrackerStatus(trackerName, ttStatus);
-      
-      // VI. Register the attempt
-      //   a) In the job
-      job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
-      //   b) In the tip
-      tip.updateStatus(taskStatus);
+      synchronized (JobTracker.this) {
+        synchronized (taskTrackers) {
+          synchronized (trackerExpiryQueue) {
+            // IV. Register a new tracker
+            boolean isTrackerRegistered = getTaskTracker(trackerName) != null;
+            if (!isTrackerRegistered) {
+              markTracker(trackerName); // add the tracker to recovery-manager
+              addNewTracker(ttStatus);
+            }
+      
+            // V. Update the tracker status
+            // This will update the meta info of the jobtracker and also add the
+            // tracker status if missing i.e register it
+            updateTaskTrackerStatus(trackerName, ttStatus);
+          }
+        }
+        // Register the attempt with job and tip, under JobTracker lock. 
+        // Since, as of today they are atomic through heartbeat.
+        // VI. Register the attempt
+        //   a) In the job
+        job.addRunningTaskToTIP(tip, attemptId, ttStatus, false);
+        //   b) In the tip
+        tip.updateStatus(taskStatus);
+      }
       
       // VII. Make an entry in the launched tasks
       expireLaunchingTasks.addNewTask(attemptId);
@@ -1060,8 +1071,10 @@
       }
       taskStatus.setCounters(counter);
       
-      // II. Replay the status
-      job.updateTaskStatus(tip, taskStatus);
+      synchronized (JobTracker.this) {
+        // II. Replay the status
+        job.updateTaskStatus(tip, taskStatus);
+      }
       
       // III. Prevent the task from expiry
       expireLaunchingTasks.removeTask(attemptId);
@@ -1097,8 +1110,10 @@
       String diagInfo = attempt.get(Keys.ERROR);
       taskStatus.setDiagnosticInfo(diagInfo); // diag info
 
-      // II. Update the task status
-     job.updateTaskStatus(tip, taskStatus);
+      synchronized (JobTracker.this) {
+        // II. Update the task status
+        job.updateTaskStatus(tip, taskStatus);
+      }
 
      // III. Prevent the task from expiry
      expireLaunchingTasks.removeTask(attemptId);
@@ -1221,22 +1236,24 @@
       hasRecovered = true;
 
       // III. Finalize the recovery
-      // Make sure that the tracker statuses in the expiry-tracker queue
-      // are updated
-      long now = System.currentTimeMillis();
-      int size = trackerExpiryQueue.size();
-      for (int i = 0; i < size ; ++i) {
-        // Get the first status
-        TaskTrackerStatus status = trackerExpiryQueue.first();
+      synchronized (trackerExpiryQueue) {
+        // Make sure that the tracker statuses in the expiry-tracker queue
+        // are updated
+        long now = System.currentTimeMillis();
+        int size = trackerExpiryQueue.size();
+        for (int i = 0; i < size ; ++i) {
+          // Get the first status
+          TaskTrackerStatus status = trackerExpiryQueue.first();
 
-        // Remove it
-        trackerExpiryQueue.remove(status);
+          // Remove it
+          trackerExpiryQueue.remove(status);
 
-        // Set the new time
-        status.setLastSeen(now);
+          // Set the new time
+          status.setLastSeen(now);
 
-        // Add back to get the sorted list
-        trackerExpiryQueue.add(status);
+          // Add back to get the sorted list
+          trackerExpiryQueue.add(status);
+        }
       }
 
       LOG.info("Restoration complete");
@@ -2181,8 +2198,10 @@
   /**
    * Adds a new node to the jobtracker. It involves adding it to the expiry
    * thread and adding it for resolution
+   * 
+   * Assuming trackerExpiryQueue is locked on entry
+   * 
    * @param status Task Tracker's status
-   * @param resolveInline Should the resolution happen inline?
    */
   private void addNewTracker(TaskTrackerStatus status) {
     trackerExpiryQueue.add(status);
@@ -2266,6 +2285,7 @@
   }
   
   // Update the listeners about the job
+  // Assuming JobTracker is locked on entry.
   private void updateJobInProgressListeners(JobChangeEvent event) {
     for (JobInProgressListener listener : jobInProgressListeners) {
       listener.jobUpdated(event);