You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:48:14 UTC

svn commit: r1077722 - in /hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred: CapacitySchedulerConf.java CapacityTaskScheduler.java

Author: omalley
Date: Fri Mar  4 04:48:13 2011
New Revision: 1077722

URL: http://svn.apache.org/viewvc?rev=1077722&view=rev
Log:
commit cefe153b23be6d76c1b7803bb0af0290e23f93aa
Author: Arun C Murthy <ac...@apache.org>
Date:   Tue Sep 21 12:11:04 2010 -0700

    Added a knob to control off-switch scheduling.

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1077722&r1=1077721&r2=1077722&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar  4 04:48:13 2011
@@ -109,6 +109,8 @@ class CapacitySchedulerConf {
   
   static final int DEFAULT_MAX_SYSTEM_JOBS = 5000;
   
+  static final int DEFAULT_MAX_TASKS_TO_SCHEDULE_AFTER_OFFSWITCH = 0;
+  
   /**
    * Create a new Capacity scheduler conf.
    * This method reads from the default configuration file mentioned in
@@ -509,7 +511,7 @@ class CapacitySchedulerConf {
   public int getMaxTasksPerHeartbeat() {
     return rmConf.getInt(
         "mapred.capacity-scheduler.maximum-tasks-per-heartbeat", 
-        Integer.MAX_VALUE);
+        Short.MAX_VALUE);
   }
 
   /**
@@ -521,4 +523,33 @@ class CapacitySchedulerConf {
     rmConf.setInt("mapred.capacity-scheduler.maximum-tasks-per-heartbeat", 
         maxTasksPerHeartbeat);
   }
+  
+  /**
+   * Get the maximum number of tasks to schedule, per heartbeat, after an
+   * off-switch task has been assigned.
+   * 
+   * @return the maximum number of tasks to schedule, per heartbeat, after an
+   *         off-switch task has been assigned
+   */
+  public int getMaxTasksToAssignAfterOffSwitch() {
+    return rmConf.getInt(
+        "mapred.capacity-scheduler.maximum-tasks-after-offswitch", 
+        DEFAULT_MAX_TASKS_TO_SCHEDULE_AFTER_OFFSWITCH);
+  }
+  
+  /**
+   * Set the maximum number of tasks to schedule, per heartbeat, after an
+   * off-switch task has been assigned.
+   * 
+   * @param maxTasksToAssignAfterOffSwitch the maximum number of tasks to 
+   *                                       schedule, per heartbeat, after an
+   *                                       off-switch task has been assigned
+   */
+  public void setMaxTasksToAssignAfterOffSwitch(
+      int maxTasksToAssignAfterOffSwitch) {
+    rmConf.setInt(
+        "mapred.capacity-scheduler.maximum-tasks-after-offswitch", 
+        Integer.MAX_VALUE);
+
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077722&r1=1077721&r2=1077722&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 04:48:13 2011
@@ -654,6 +654,7 @@ class CapacityTaskScheduler extends Task
   private long limitMaxMemForReduceTasks;
   
   private volatile int maxTasksPerHeartbeat;
+  private volatile int maxTasksToAssignAfterOffSwitch;
 
   public CapacityTaskScheduler() {
     this(new Clock());
@@ -880,7 +881,10 @@ class CapacityTaskScheduler extends Task
     mapScheduler.initialize(queueInfoMap);
     reduceScheduler.initialize(queueInfoMap);
     
+    // scheduling tunables
     maxTasksPerHeartbeat = schedConf.getMaxTasksPerHeartbeat();
+    maxTasksToAssignAfterOffSwitch = 
+      schedConf.getMaxTasksToAssignAfterOffSwitch();
   }
   
   Map<String, CapacitySchedulerQueue> 
@@ -1064,6 +1068,7 @@ class CapacityTaskScheduler extends Task
                     throws IOException {
     int availableSlots = maxMapSlots - currentMapSlots;
     boolean assignOffSwitch = true;
+    int tasksToAssignAfterOffSwitch = this.maxTasksToAssignAfterOffSwitch;
     while (availableSlots > 0) {
       mapScheduler.sortQueues();
       TaskLookupResult tlr = mapScheduler.assignTasks(taskTracker, 
@@ -1091,6 +1096,19 @@ class CapacityTaskScheduler extends Task
         assignOffSwitch = false;
       }
       
+      // Respect limits on #tasks to assign after an off-switch task is assigned
+      if (!assignOffSwitch) {
+        if (tasksToAssignAfterOffSwitch == 0) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug("Hit limit of max tasks after off-switch: " + 
+                this.maxTasksToAssignAfterOffSwitch + " after " + 
+                tasks.size() + " maps.");
+          }
+          return;
+        }
+        --tasksToAssignAfterOffSwitch;
+      }
+      
       // Assigned some slots
       availableSlots -= t.getNumSlotsRequired();