You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/02/20 12:12:48 UTC

svn commit: r746206 - in /hadoop/core/trunk: CHANGES.txt src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

Author: yhemanth
Date: Fri Feb 20 11:12:48 2009
New Revision: 746206

URL: http://svn.apache.org/viewvc?rev=746206&view=rev
Log:
HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare Scheduler accesses the tasktrackers stored by the JobTracker. Contributed by Rahul Kumar Singh.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=746206&r1=746205&r2=746206&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Fri Feb 20 11:12:48 2009
@@ -804,6 +804,10 @@
     HADOOP-5269. Fixes a problem to do with tasktracker holding on to FAILED_UNCLEAN
     or KILLED_UNCLEAN tasks forever. (Amareshwari Sriramadasu via ddas) 
 
+    HADOOP-5214. Fixes a ConcurrentModificationException while the Fairshare
+    Scheduler accesses the tasktrackers stored by the JobTracker.
+    (Rahul Kumar Singh via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=746206&r1=746205&r2=746206&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Feb 20 11:12:48 2009
@@ -233,11 +233,12 @@
       runnableReduces += runnableTasks(job, TaskType.REDUCE);
     }
 
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
     // Compute total map/reduce slots
     // In the future we can precompute this if the Scheduler becomes a 
     // listener of tracker join/leave events.
-    int totalMapSlots = getTotalSlots(TaskType.MAP);
-    int totalReduceSlots = getTotalSlots(TaskType.REDUCE);
+    int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
+    int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
     
     // Scan to see whether any job needs to run a map, then a reduce
     ArrayList<Task> tasks = new ArrayList<Task>();
@@ -331,31 +332,36 @@
    * fair shares, deficits, minimum slot allocations, and numbers of running
    * and needed tasks of each type. 
    */
-  protected synchronized void update() {
+  protected void update() {
+    //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+    ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+    // Got clusterStatus hence acquiring scheduler lock now
     // Remove non-running jobs
-    List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
-    for (JobInProgress job: infos.keySet()) { 
-      int runState = job.getStatus().getRunState();
-      if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
+    synchronized(this){
+      List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+      for (JobInProgress job: infos.keySet()) { 
+        int runState = job.getStatus().getRunState();
+        if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED
           || runState == JobStatus.KILLED) {
-        toRemove.add(job);
+            toRemove.add(job);
+        }
       }
+      for (JobInProgress job: toRemove) {
+        infos.remove(job);
+        poolMgr.removeJob(job);
+      }
+      // Update running jobs with deficits since last update, and compute new
+      // slot allocations, weight, shares and task counts
+      long now = clock.getTime();
+      long timeDelta = now - lastUpdateTime;
+      updateDeficits(timeDelta);
+      updateRunnability();
+      updateTaskCounts();
+      updateWeights();
+      updateMinSlots();
+      updateFairShares(clusterStatus);
+      lastUpdateTime = now;
     }
-    for (JobInProgress job: toRemove) {
-      infos.remove(job);
-      poolMgr.removeJob(job);
-    }
-    // Update running jobs with deficits since last update, and compute new
-    // slot allocations, weight, shares and task counts
-    long now = clock.getTime();
-    long timeDelta = now - lastUpdateTime;
-    updateDeficits(timeDelta);
-    updateRunnability();
-    updateTaskCounts();
-    updateWeights();
-    updateMinSlots();
-    updateFairShares();
-    lastUpdateTime = now;
   }
   
   private void updateDeficits(long timeDelta) {
@@ -594,7 +600,7 @@
     return slotsLeft;
   }
 
-  private void updateFairShares() {
+  private void updateFairShares(ClusterStatus clusterStatus) {
     // Clear old fairShares
     for (JobInfo info: infos.values()) {
       info.mapFairShare = 0;
@@ -618,7 +624,7 @@
           jobsLeft.add(info);
         }
       }
-      double slotsLeft = getTotalSlots(type);
+      double slotsLeft = getTotalSlots(type, clusterStatus);
       while (!jobsLeft.isEmpty()) {
         double totalWeight = 0;
         for (JobInfo info: jobsLeft) {
@@ -697,13 +703,9 @@
     return poolMgr;
   }
 
-  public int getTotalSlots(TaskType type) {
-    int slots = 0;
-    for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
-      slots += (type == TaskType.MAP ?
-          tt.getMaxMapTasks() : tt.getMaxReduceTasks());
-    }
-    return slots;
+  private int getTotalSlots(TaskType type, ClusterStatus clusterStatus) {
+    return (type == TaskType.MAP ?
+      clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
   }
 
   public boolean getUseFifo() {