You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:22 UTC

svn commit: r1076951 - in /hadoop/common/branches/branch-0.20-security-patches: conf/ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content...

Author: omalley
Date: Fri Mar  4 03:25:21 2011
New Revision: 1076951

URL: http://svn.apache.org/viewvc?rev=1076951&view=rev
Log:
commit 2a7c1121115819f918adb47c4aebd4daa693a0ea
Author: Lee Tucker <lt...@yahoo-inc.com>
Date:   Thu Jul 30 17:40:40 2009 -0700

    Applying patch 2855407.mr532.patch

Modified:
    hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml

Modified: hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template Fri Mar  4 03:25:21 2011
@@ -45,6 +45,44 @@
     of the job queue.
     </description>
   </property>
+
+<property>
+  <name>mapred.capacity-scheduler.queue.default.max.map.slots</name>
+  <value>-1</value>
+  <description>
+    This value is the maximum map slots that can be used in a
+    queue at any point of time. So for example assuming above config value
+    is 100 , not more than 100 tasks would be in the queue at any point of
+    time, assuming each task takes one slot.
+
+    Default value of -1 would disable this capping feature
+
+    Typically the queue capacity should be equal to this limit.
+    If queue capacity is more than this limit, excess capacity will be
+    used by the other queues. If queue capacity is less than the above
+    limit , then the limit would be the queue capacity - as in the current
+    implementation
+  </description>
+</property>
+
+<property>
+  <name>mapred.capacity-scheduler.queue.default.max.reduce.slots</name>
+  <value>-1</value>
+  <description>
+    This value is the maximum reduce slots that can be used in a
+    queue at any point of time. So for example assuming above config value
+      is 100 , not more than 100 reduce tasks would be in the queue at any point
+      of time, assuming each task takes one slot.
+
+    Default value of -1 would disable this capping feature
+
+    Typically the queue capacity should be equal to this limit.
+    If queue capacity is more than this limit, excess capacity will be
+    used by the other queues. If queue capacity is less than the above
+    limit , then the limit would be the queue capacity - as in the current
+    implementation
+  </description>
+</property>
   
   <!-- The default configuration settings for the capacity task scheduler -->
   <!-- The default values would be applied to all the queues which don't have -->

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar  4 03:25:21 2011
@@ -76,6 +76,18 @@ class CapacitySchedulerConf {
     "mapred.capacity-scheduler.task.limit.maxpmem";
 
   /**
+   *  Configuration that provides the maximum cap for the map task in a queue
+   *  at any given point of time.
+   */
+  static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
+
+  /**
+   *  Configuration that provides the maximum cap for the reduce task in a queue
+   *  at any given point of time.
+   */
+  static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+
+  /**
    * The constant which defines the default initialization thread
    * polling interval, denoted in milliseconds.
    */
@@ -357,4 +369,40 @@ class CapacitySchedulerConf {
     rmConf.setInt(
         "mapred.capacity-scheduler.init-worker-threads", poolSize);
   }
+
+  /**
+   * get the max map slots cap
+   * @param queue
+   * @return
+   */
+  public int getMaxMapCap(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+  }
+
+  /**
+   * Used for testing
+   * @param queue
+   * @param val
+   */
+  public void setMaxMapCap(String queue,int val) {
+    rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
+  }
+
+  /**
+   * get the max reduce slots cap
+   * @param queue
+   * @return
+   */
+  public int getMaxReduceCap(String queue) {
+    return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);    
+  }
+
+  /**
+   * Used for testing
+   * @param queue
+   * @param val
+   */
+  public void setMaxReduceCap(String queue,int val) {
+    rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+  }
 }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 03:25:21 2011
@@ -76,23 +76,36 @@ class CapacityTaskScheduler extends Task
    **********************************************************************/
 
   private static class TaskSchedulingInfo {
+    
+    private static final String LIMIT_NORMALIZED_CAPACITY_STRING
+      = "(Capacity is restricted to max limit of %d slots.\n" +
+        "Remaining %d slots will be used by other queues.)\n";
     /** 
      * the actual capacity, which depends on how many slots are available
      * in the cluster at any given time. 
      */
-    int capacity = 0;
+    private int capacity = 0;
     // number of running tasks
     int numRunningTasks = 0;
     // number of slots occupied by running tasks
     int numSlotsOccupied = 0;
 
     /**
+     * max task limit
+     * This value is the maximum slots that can be used in a
+     * queue at any point of time. So for example assuming above config value
+     * is 100 , not more than 100 tasks would be in the queue at any point of
+     * time, assuming each task takes one slot.
+     */
+    private int maxTaskLimit = -1;
+
+    /**
      * for each user, we need to keep track of number of slots occupied by
      * running tasks
      */
     Map<String, Integer> numSlotsOccupiedByUser = 
       new HashMap<String, Integer>();
-    
+
     /**
      * reset the variables associated with tasks
      */
@@ -104,15 +117,53 @@ class CapacityTaskScheduler extends Task
       }
     }
 
+
+    int getMaxTaskLimit() {
+      return maxTaskLimit;
+    }
+
+    void setMaxTaskLimit(int maxTaskCap) {
+      this.maxTaskLimit = maxTaskCap;
+    }
+
+    /**
+     * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
+     * capacity.
+     * @return
+     */
+    int getCapacity() {
+      return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
+        capacity;
+    }
+
+    /**
+     * Mutator method for capacity
+     * @param capacity
+     */
+    void setCapacity(int capacity) {
+        this.capacity = capacity;
+    }
+
+
     /**
      * return information about the tasks
      */
     @Override
     public String toString() {
       float occupiedSlotsAsPercent =
-          capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0;
+          getCapacity() != 0 ?
+            ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
       StringBuffer sb = new StringBuffer();
+      
       sb.append("Capacity: " + capacity + " slots\n");
+      //If maxTaskLimit is less than the capacity
+      if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
+        sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,   
+                        maxTaskLimit, (capacity-maxTaskLimit)));
+      }
+      if (maxTaskLimit >= 0) {
+        sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
+      }
       sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
           Integer.valueOf(numSlotsOccupied), Float
               .valueOf(occupiedSlotsAsPercent)));
@@ -166,14 +217,17 @@ class CapacityTaskScheduler extends Task
     TaskSchedulingInfo mapTSI;
     TaskSchedulingInfo reduceTSI;
     
-    public QueueSchedulingInfo(String queueName, float capacityPercent, 
-        int ulMin, JobQueuesManager jobQueuesManager) {
+    public QueueSchedulingInfo(String queueName, float capacityPercent,
+                               int ulMin, JobQueuesManager jobQueuesManager,
+                               int mapCap, int reduceCap) {
       this.queueName = new String(queueName);
       this.capacityPercent = capacityPercent;
       this.ulMin = ulMin;
       this.jobQueuesManager = jobQueuesManager;
       this.mapTSI = new TaskSchedulingInfo();
       this.reduceTSI = new TaskSchedulingInfo();
+      this.mapTSI.setMaxTaskLimit(mapCap);
+      this.reduceTSI.setMaxTaskLimit(reduceCap);
     }
     
     /**
@@ -194,7 +248,7 @@ class CapacityTaskScheduler extends Task
           (jobQueuesManager.doesQueueSupportPriorities(queueName))?
               "YES":"NO"));
       sb.append("-------------\n");
-      
+
       sb.append("Map tasks\n");
       sb.append(mapTSI.toString());
       sb.append("-------------\n");
@@ -339,8 +393,9 @@ class CapacityTaskScheduler extends Task
      * capacity. This ordered list is iterated over, when assigning tasks.
      */  
     private List<QueueSchedulingInfo> qsiForAssigningTasks = 
-      new ArrayList<QueueSchedulingInfo>();  
-    /** 
+      new ArrayList<QueueSchedulingInfo>();
+
+    /**
      * Comparator to sort queues.
      * For maps, we need to sort on QueueSchedulingInfo.mapTSI. For 
      * reducers, we use reduceTSI. So we'll need separate comparators.  
@@ -353,10 +408,10 @@ class CapacityTaskScheduler extends Task
         TaskSchedulingInfo t2 = getTSI(q2);
         // look at how much capacity they've filled. Treat a queue with
         // capacity=0 equivalent to a queue running at capacity
-        double r1 = (0 == t1.capacity)? 1.0f:
-          (double)t1.numSlotsOccupied/(double)t1.capacity;
-        double r2 = (0 == t2.capacity)? 1.0f:
-          (double)t2.numSlotsOccupied/(double)t2.capacity;
+        double r1 = (0 == t1.getCapacity())? 1.0f:
+          (double)t1.numSlotsOccupied/(double) t1.getCapacity();
+        double r2 = (0 == t2.getCapacity())? 1.0f:
+          (double)t2.numSlotsOccupied/(double) t2.getCapacity();
         if (r1<r2) return -1;
         else if (r1>r2) return 1;
         else return 0;
@@ -412,8 +467,8 @@ class CapacityTaskScheduler extends Task
       // slots we're getting).
       int currentCapacity;
       TaskSchedulingInfo tsi = getTSI(qsi);
-      if (tsi.numSlotsOccupied < tsi.capacity) {
-        currentCapacity = tsi.capacity;
+      if (tsi.numSlotsOccupied < tsi.getCapacity()) {
+        currentCapacity = tsi.getCapacity();
       }
       else {
         currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
@@ -597,7 +652,11 @@ class CapacityTaskScheduler extends Task
       for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
         // we may have queues with capacity=0. We shouldn't look at jobs from 
         // these queues
-        if (0 == getTSI(qsi).capacity) {
+        if (0 == getTSI(qsi).getCapacity()) {
+          continue;
+        }
+        
+        if(this.areTasksInQueueOverLimit(qsi)) {
           continue;
         }
         TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -622,6 +681,31 @@ class CapacityTaskScheduler extends Task
       return TaskLookupResult.getNoTaskFoundResult();
     }
 
+
+    /**
+     * Check if the max task limit is set for this queue
+     * if set , ignore this qsi if current num of occupied
+     * slots  of a TYPE in the queue is >= getMaxTaskCap().
+     * @param qsi
+     * @return
+     */
+
+    private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
+      TaskSchedulingInfo tsi = getTSI(qsi);
+      if (tsi.getMaxTaskLimit() >= 0) {
+        if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+          if (LOG.isDebugEnabled()) {
+            LOG.debug(
+              "Queue " + qsi.queueName + " has reached its  max " + type +
+                " limit ");
+            LOG.debug("Current running tasks " + tsi.getCapacity());
+          }
+          return true;
+        }
+      }
+      return false;
+    }
+
     // for debugging.
     private void printQSIs() {
       if (LOG.isDebugEnabled()) {
@@ -629,12 +713,16 @@ class CapacityTaskScheduler extends Task
         for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
           TaskSchedulingInfo tsi = getTSI(qsi);
           Collection<JobInProgress> runJobs =
-              scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
-          s.append(String.format(" Queue '%s'(%s): runningTasks=%d, "
-              + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName,
+            scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+          s.append(
+            String.format(
+              " Queue '%s'(%s): runningTasks=%d, "
+                + "occupiedSlots=%d, capacity=%d, runJobs=%d  maxTaskLimit=%d ",
+              qsi.queueName,
               this.type, Integer.valueOf(tsi.numRunningTasks), Integer
-                  .valueOf(tsi.numSlotsOccupied), Integer
-                  .valueOf(tsi.capacity), Integer.valueOf(runJobs.size())));
+                .valueOf(tsi.numSlotsOccupied), Integer
+                .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
+              Integer.valueOf(tsi.getMaxTaskLimit())));
         }
         LOG.debug(s);
       }
@@ -970,8 +1058,9 @@ class CapacityTaskScheduler extends Task
       }
       int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
       // create our QSI and add to our hashmap
-      QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity, 
-                                                    ulMin, jobQueuesManager);
+      QueueSchedulingInfo qsi = new QueueSchedulingInfo(
+        queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
+          queueName), schedConf.getMaxReduceCap(queueName));
       queueInfoMap.put(queueName, qsi);
 
       // create the queues of job objects
@@ -1068,12 +1157,12 @@ class CapacityTaskScheduler extends Task
     for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
       // compute new capacities, if TT slots have changed
       if (mapClusterCapacity != prevMapClusterCapacity) {
-        qsi.mapTSI.capacity =
-          (int)(qsi.capacityPercent*mapClusterCapacity/100);
+        qsi.mapTSI.setCapacity((int)
+          (qsi.capacityPercent*mapClusterCapacity/100));
       }
       if (reduceClusterCapacity != prevReduceClusterCapacity) {
-        qsi.reduceTSI.capacity =
-          (int)(qsi.capacityPercent*reduceClusterCapacity/100);
+        qsi.reduceTSI.setCapacity((int)
+          (qsi.capacityPercent*reduceClusterCapacity/100));
       }
       // reset running/pending tasks, tasks per user
       qsi.mapTSI.resetTaskVars();

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 03:25:21 2011
@@ -862,6 +862,86 @@ public class TestCapacityScheduler exten
                  1, rqueue.size());
 
   }
+
+  /**
+   * Test the max map limit.
+   * @throws IOException
+   */
+  public void testMaxMapCap() throws IOException {
+    this.setUp(4,1,1);
+    taskTrackerManager.addQueues(new String[] {"default"});
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxMapCap("default",2);
+    resConf.setMaxReduceCap("default",-1);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    //submit the Job
+    FakeJobInProgress fjob1 =
+      submitJobAndInit(JobStatus.PREP,3,1,"default","user");
+
+    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+
+    //Once the 2 tasks are running the third assigment should be reduce.
+    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+    //This should fail.
+    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
+    assertNull(task4);
+    //Now complete the task 1.
+        // complete the job
+    taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
+                                  fjob1);
+    //We have completed the tt1 task which was a map task so we expect one map
+    //task to be picked up
+    checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
+  }
+
+  /**
+   * Test max reduce limit
+   * @throws IOException
+   */
+  public void testMaxReduceCap() throws IOException {
+    this.setUp(4, 1, 1);
+    taskTrackerManager.addQueues(new String[]{"default"});
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxMapCap("default", -1);
+    resConf.setMaxReduceCap("default", 2);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    //submit the Job
+    FakeJobInProgress fjob1 =
+      submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
+
+    List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+    List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+    List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+
+    //This should fail. 1 map, 2 reduces , we have reached the limit.
+    List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
+    assertNull(task4);
+    //Now complete the task 1 i.e map task.
+    // complete the job
+    taskTrackerManager.finishTask(
+      "tt1", task1.get(0).getTaskID().toString(),
+      fjob1);
+
+    //This should still fail as only map task is done
+    task4 = scheduler.assignTasks(tracker("tt4"));
+    assertNull(task4);
+
+    //Complete the reduce task
+    taskTrackerManager.finishTask(
+      "tt2", task2.get(0).getTaskID().toString(), fjob1);
+
+    //One reduce is done hence assign the new reduce.
+    checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+  }
   
   // test if the queue reflects the changes
   private void testJobOrderChange(FakeJobInProgress fjob1, 
@@ -1132,6 +1212,144 @@ public class TestCapacityScheduler exten
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
   }
 
+  /**
+   * Creates a queue with max task limit of 2
+   * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
+   * given to high ram job and are reserved , no other tasks are accepted .
+   *
+   * @throws IOException
+   */
+  public void testHighMemoryBlockingWithMaxLimit()
+      throws IOException {
+
+    // 2 map and 1 reduce slots
+    taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+
+    taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxMapCap("defaultXYZ",2);
+    scheduler.setTaskTrackerManager(taskTrackerManager);
+    // enabled memory-based scheduling
+    // Normal job in the cluster would be 1GB maps/reduces
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+        2 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+        1 * 1024);
+    scheduler.getConf().setLong(
+        JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // The situation :  Submit 2 jobs with high memory map task
+    //Set the max limit for queue to 2 ,
+    // try submitting more map tasks to the queue , it should not happen
+
+    LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+        + "2 map tasks");
+    JobConf jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(2 * 1024);
+    jConf.setMemoryForReduceTask(0);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(0);
+    jConf.setQueueName("defaultXYZ");
+    jConf.setUser("u1");
+    FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+        + "2 map/red tasks");
+    jConf = new JobConf(conf);
+    jConf.setMemoryForMapTask(1 * 1024);
+    jConf.setMemoryForReduceTask(1 * 1024);
+    jConf.setNumMapTasks(2);
+    jConf.setNumReduceTasks(2);
+    jConf.setQueueName("defaultXYZ");
+    jConf.setUser("u1");
+    FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+    // first, a map from j1 will run this is a high memory job so it would
+    // occupy the 2 slots
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+    // at this point, the scheduler tries to schedule another map from j1.
+    // there isn't enough space. The second job's reduce should be scheduled.
+    checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+    
+    checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+    checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+
+    //at this point , the scheduler tries to schedule another map from j2 for
+    //another task tracker.
+    // This should not happen as all the map slots are taken
+    //by the first task itself.hence reduce task from the second job is given
+
+    checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+  }
+
+  /**
+   *   test if user limits automatically adjust to max map or reduce limit
+   */
+  public void testUserLimitsWithMaxLimits() throws Exception {
+    setUp(4, 4, 4);
+    // set up some queues
+    String[] qs = {"default"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxMapCap("default", 2);
+    resConf.setMaxReduceCap("default", 2);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // submit a job
+    FakeJobInProgress fjob1 =
+      submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+    FakeJobInProgress fjob2 =
+      submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+
+    // for queue 'default', the capacity for maps is 2.
+    // But the max map limit is 2
+    // hence user should be getting not more than 1 as it is the 50%.
+    Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+    //Now we should get the task from the other job. As the
+    //first user has reached his max map limit.
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+
+    //Now we are done with map limit , now if we ask for task we should
+    // get reduce from 1st job
+    checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+    // Now we're at full capacity for maps. 1 done with reduces for job 1 so
+    // now we should get 1 reduces for job 2
+    Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+
+    taskTrackerManager.finishTask(
+      "tt1", t1.getTaskID().toString(),
+      fjob1);
+
+    //tt1 completed the task so we have 1 map slot for u1
+    // we are assigning the 2nd map task from fjob1
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+
+    taskTrackerManager.finishTask(
+      "tt4", t4.getTaskID().toString(),
+      fjob2);
+    //tt4 completed the task , so we have 1 reduce slot for u2
+    //we are assigning the 2nd reduce from fjob2
+    checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+
+  }
+
+
   // test user limits
   public void testUserLimits() throws Exception {
     // set up some queues
@@ -2707,17 +2925,25 @@ public class TestCapacityScheduler exten
 
   /**
    * Verify the number of slots of type 'type' from the queue 'queue'.
+   * incrMapIndex and incrReduceIndex are set , when expected output string is
+   * changed.these values can be set if the index of
+   * "Used capacity: %d (%.1f%% of Capacity)"
+   * is changed.
    * 
    * @param queue
    * @param type
    * @param numActiveUsers in the queue at present.
    * @param expectedOccupiedSlots
    * @param expectedOccupiedSlotsPercent
-   * @return
+   * @param incrMapIndex
+   * @param incrReduceIndex
    */
-  private void checkOccupiedSlots(String queue,
-      TaskType type, int numActiveUsers,
-      int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+  private void checkOccupiedSlots(
+    String queue,
+    TaskType type, int numActiveUsers,
+    int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex
+    ,int incrReduceIndex
+  ) {
     scheduler.updateQSIInfoForTests();
     QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
     String schedulingInfo =
@@ -2725,9 +2951,9 @@ public class TestCapacityScheduler exten
     String[] infoStrings = schedulingInfo.split("\n");
     int index = -1;
     if (type.equals(TaskType.MAP)) {
-      index = 7;
+      index = 7+ incrMapIndex;
     } else if (type.equals(TaskType.REDUCE)) {
-      index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+      index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers)+incrReduceIndex;
     }
     LOG.info(infoStrings[index]);
     assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
@@ -2735,6 +2961,24 @@ public class TestCapacityScheduler exten
         infoStrings[index]);
   }
 
+  /**
+   *
+   * @param queue
+   * @param type
+   * @param numActiveUsers
+   * @param expectedOccupiedSlots
+   * @param expectedOccupiedSlotsPercent
+   */
+  private void checkOccupiedSlots(
+    String queue,
+    TaskType type, int numActiveUsers,
+    int expectedOccupiedSlots, float expectedOccupiedSlotsPercent
+  ) {
+    checkOccupiedSlots(
+      queue, type, numActiveUsers, expectedOccupiedSlots,
+      expectedOccupiedSlotsPercent,0,0);
+  }
+
   private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
     assertTrue("Observed and expected queues are not of same length.",
         expectedOrder.length == observedOrder.length);

Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Fri Mar  4 03:25:21 2011
@@ -204,6 +204,38 @@
           	users, no user can use more than 25% of the queue's resources. A 
           	value of 100 implies no user limits are imposed.</td>
           </tr>
+          <tr><td>mapred.capacity-scheduler.queue.&lt;queue-name&gt;.max.map.slots</td>
+          	<td>
+		    This value is the maximum max slots that can be used in a
+		    queue at any point of time. So for example assuming above config value
+		    is 100 , not more than 100 tasks would be in the queue at any point of
+		    time, assuming each task takes one slot.
+
+		    Default value of -1 would disable this capping feature
+
+		    Typically the queue capacity should be equal to this limit.
+		    If queue capacity is more than this limit, excess capacity will be
+		    used by the other queues. If queue capacity is less than the above
+		    limit , then the limit would be the queue capacity - as in the current
+		    implementation
+                </td>
+          </tr>
+          <tr><td>mapred.capacity-scheduler.queue.&lt;queue-name&gt;.max.reduce.slots</td>
+          	<td>
+		    This value is the maximum reduce slots that can be used in a
+		    queue at any point of time. So for example assuming above config value
+		    is 100 , not more than 100 tasks would be in the queue at any point of
+		    time, assuming each task takes one slot.
+
+		    Default value of -1 would disable this capping feature
+
+		    Typically the queue capacity should be equal to this limit.
+		    If queue capacity is more than this limit, excess capacity will be
+		    used by the other queues. If queue capacity is less than the above
+		    limit , then the limit would be the queue capacity - as in the current
+		    implementation
+                </td>
+          </tr>
         </table>
       </section>