You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:40:52 UTC

svn commit: r1077655 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred: CapacitySchedulerConf.java CapacitySchedulerQueue.java CapacityTaskScheduler.java

Author: omalley
Date: Fri Mar  4 04:40:52 2011
New Revision: 1077655

URL: http://svn.apache.org/viewvc?rev=1077655&view=rev
Log:
commit 6207d0c9e9039448a367080533398e2edf3dbc69
Author: Arun C Murthy <ac...@apache.org>
Date:   Thu Aug 19 10:50:37 2010 -0700

    MAPREDUCE-1872. Fixed a corner case where user limits were unfairly applied, added a knob to limit number of tasks per heartbeat.

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar  4 04:40:52 2011
@@ -501,4 +501,24 @@ class CapacitySchedulerConf {
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
+  
+  /**
+   * Get the maximum number of tasks which can be scheduled in a heartbeat.
+   * @return the maximum number of tasks which can be scheduled in a heartbeat
+   */
+  public int getMaxTasksPerHeartbeat() {
+    return rmConf.getInt(
+        "mapred.capacity-scheduler.maximum-tasks-per-heartbeat", 
+        Integer.MAX_VALUE);
+  }
+
+  /**
+   * Set the maximum number of tasks which can be scheduled in a heartbeat
+   * @param maxTasksPerHeartbeat the maximum number of tasks which can be 
+   *                             scheduled in a heartbeat
+   */
+  public void setMaxTasksPerHeartbeat(int maxTasksPerHeartbeat) {
+    rmConf.setInt("mapred.capacity-scheduler.maximum-tasks-per-heartbeat", 
+        maxTasksPerHeartbeat);
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Fri Mar  4 04:40:52 2011
@@ -22,8 +22,10 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.Map;
+import java.util.Set;
 import java.util.TreeMap;
 
 import org.apache.commons.logging.Log;
@@ -69,6 +71,9 @@ class CapacitySchedulerQueue {
     //in cluster at any given time.
     private int maxCapacity = -1;
   
+    // Active users
+    Set<String> users = new HashSet<String>();
+    
     /**
      * for each user, we need to keep track of number of slots occupied by
      * running tasks
@@ -82,6 +87,7 @@ class CapacitySchedulerQueue {
     void reset() {
       numRunningTasks = 0;
       numSlotsOccupied = 0;
+      users.clear();
       numSlotsOccupiedByUser.clear();
     }
   
@@ -120,6 +126,13 @@ class CapacitySchedulerQueue {
     }
   
     /**
+     * @return number of active users
+     */
+    int getNumActiveUsers() {
+      return users.size();
+    }
+    
+    /**
      * return information about the tasks
      */
     @Override
@@ -184,12 +197,15 @@ class CapacitySchedulerQueue {
       }
     }
     
-    void updateSlotsUsage(String user, int numRunningTasks, int numSlotsOccupied) {
+    void updateSlotsUsage(String user, int pendingTasks, int numRunningTasks, int numSlotsOccupied) {
       this.numRunningTasks += numRunningTasks;
       this.numSlotsOccupied += numSlotsOccupied;
       Integer i = this.numSlotsOccupiedByUser.get(user);
       int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
       this.numSlotsOccupiedByUser.put(user, slots);
+      if (pendingTasks > 0) {
+        users.add(user);
+      }
     }
   }
 
@@ -489,6 +505,16 @@ class CapacitySchedulerQueue {
     throw new IllegalArgumentException("Illegal taskType=" + taskType);
   }
   
+  int getNumActiveUsersByTaskType(TaskType taskType) {
+    if (taskType == TaskType.MAP) {
+      return mapSlots.getNumActiveUsers();
+    } else if (taskType == TaskType.REDUCE) {
+      return reduceSlots.getNumActiveUsers();
+    }
+    
+    throw new IllegalArgumentException("Illegal taskType=" + taskType);
+  }
+  
   /**
    * A new job is added to the 
    * @param job
@@ -553,12 +579,14 @@ class CapacitySchedulerQueue {
    * @param numRunningTasks
    * @param numSlotsOccupied
    */
-  void update(TaskType type, String user, 
+  void update(TaskType type, JobInProgress job, String user, 
       int numRunningTasks, int numSlotsOccupied) {
     if (type == TaskType.MAP) {
-      mapSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+      mapSlots.updateSlotsUsage(user, job.pendingMaps(), 
+          numRunningTasks, numSlotsOccupied);
     } else if (type == TaskType.REDUCE) {
-      reduceSlots.updateSlotsUsage(user, numRunningTasks, numSlotsOccupied);
+      reduceSlots.updateSlotsUsage(user, job.pendingReduces(), 
+          numRunningTasks, numSlotsOccupied);
     }
   }
   
@@ -611,9 +639,9 @@ class CapacitySchedulerQueue {
               numRunningReduceSlots,
               numReservedReduceSlotsForThisJob));
 
-      update(TaskType.MAP, j.getProfile().getUser(), 
+      update(TaskType.MAP, j, j.getProfile().getUser(), 
           numMapsRunningForThisJob, numMapSlotsForThisJob);
-      update(TaskType.REDUCE, j.getProfile().getUser(), 
+      update(TaskType.REDUCE, j, j.getProfile().getUser(), 
           numReducesRunningForThisJob, numReduceSlotsForThisJob);
 
       if (LOG.isDebugEnabled()) {
@@ -1127,9 +1155,13 @@ class CapacitySchedulerQueue {
     // queue's configured capacity * user-limit-factor.
     // Also, the queue's configured capacity should be higher than 
     // queue-hard-limit * ulMin
+    
+    // All users in this queue might not need any slots of type 'taskType'
+    int activeUsers = Math.max(1, getNumActiveUsersByTaskType(taskType));  
+    
     int limit = 
       Math.min(
-          Math.max(divideAndCeil(currentCapacity, numJobsByUser.size()), 
+          Math.max(divideAndCeil(currentCapacity, activeUsers), 
                    divideAndCeil(ulMin*currentCapacity, 100)),
           (int)(queueCapacity * ulMinFactor)
           );
@@ -1138,9 +1170,11 @@ class CapacitySchedulerQueue {
         limit) {
       if (LOG.isDebugEnabled()) {
         LOG.debug("User " + user + " is over limit for queue=" + queueName + 
+            " queueCapacity=" + queueCapacity +
             " num slots occupied=" + getNumSlotsOccupiedByUser(user, taskType) + 
             " limit=" + limit +" numSlotsRequested=" + numSlotsRequested + 
-            " currentCapacity=" + currentCapacity);
+            " currentCapacity=" + currentCapacity + 
+            " numActiveUsers=" + getNumActiveUsersByTaskType(taskType));
       }
       return false;
     }
@@ -1303,4 +1337,4 @@ class CapacitySchedulerQueue {
     return true;
   }
 
-}
\ No newline at end of file
+}

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077655&r1=1077654&r2=1077655&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 04:40:52 2011
@@ -652,6 +652,8 @@ class CapacityTaskScheduler extends Task
   private long memSizeForReduceSlotOnJT;
   private long limitMaxMemForMapTasks;
   private long limitMaxMemForReduceTasks;
+  
+  private volatile int maxTasksPerHeartbeat;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -877,6 +879,8 @@ class CapacityTaskScheduler extends Task
     // let our mgr objects know about the queues
     mapScheduler.initialize(queueInfoMap);
     reduceScheduler.initialize(queueInfoMap);
+    
+    maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
   }
   
   Map<String, CapacitySchedulerQueue> 
@@ -1077,6 +1081,10 @@ class CapacityTaskScheduler extends Task
 
       tasks.add(t);
 
+      if (tasks.size() >= maxTasksPerHeartbeat) {
+        return;
+      }
+      
       if (TaskLookupResult.LookUpStatus.OFF_SWITCH_TASK_FOUND == 
         tlr.getLookUpStatus()) {
         // Atmost 1 off-switch task per-heartbeat
@@ -1089,7 +1097,7 @@ class CapacityTaskScheduler extends Task
       // Update the queue
       CapacitySchedulerQueue queue = 
         queueInfoMap.get(job.getProfile().getQueueName());
-      queue.update(TaskType.MAP, 
+      queue.update(TaskType.MAP, job,
           job.getProfile().getUser(), 1, t.getNumSlotsRequired());
     }
   }