You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/15 22:17:46 UTC

svn commit: r705020 - in /hadoop/core/branches/branch-0.19: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: acmurthy
Date: Wed Oct 15 13:17:45 2008
New Revision: 705020

URL: http://svn.apache.org/viewvc?rev=705020&view=rev
Log:
Merge -r 705010:705011 from trunk to branch-0.19 to fix HADOOP-4373.

Modified:
    hadoop/core/branches/branch-0.19/CHANGES.txt
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 13:17:45 2008
@@ -870,6 +870,9 @@
     HADOOP-4236. Ensure un-initialized jobs are killed correctly on
     user-demand. (Sharad Agarwal via acmurthy) 
 
+    HADOOP-4373. Fix calculation of Guaranteed Capacity for the
+    capacity-scheduler. (Hemanth Yamijala via acmurthy) 
+
 Release 0.18.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 15 13:17:45 2008
@@ -282,12 +282,26 @@
           return -1;
         }
         else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
-          // neither needs to reclaim. look at how much capacity they've filled
-          double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
-          double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
-          if (r1<r2) return -1;
-          else if (r1>r2) return 1;
-          else return 0;
+          // neither needs to reclaim. If either doesn't have a capacity yet,
+          // it comes at the end of the queue.
+          if ((q1.guaranteedCapacity == 0) &&
+                (q2.guaranteedCapacity != 0)) {
+            return 1;
+          } else if ((q1.guaranteedCapacity != 0) &&
+                      (q2.guaranteedCapacity == 0)) {
+            return -1;
+          } else if ((q1.guaranteedCapacity == 0) &&
+                      (q2.guaranteedCapacity == 0)) {
+            // both don't have capacities, treat them as equal.
+            return 0;
+          } else {
+            // look at how much capacity they've filled
+            double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+            double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+            if (r1<r2) return -1;
+            else if (r1>r2) return 1;
+            else return 0;
+          }
         }
         else {
           // both have to reclaim. Look at which one needs to reclaim earlier
@@ -335,6 +349,10 @@
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
       long currentTime = scheduler.clock.getTime();
       for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+        if (qsi.guaranteedCapacity <= 0) {
+          // no capacity, hence nothing can be reclaimed.
+          continue;
+        }
         // is there any resource that needs to be reclaimed? 
         if ((!qsi.reclaimList.isEmpty()) &&  
             (qsi.reclaimList.getFirst().whenToKill < 
@@ -484,8 +502,8 @@
       for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
         // compute new GCs and ACs, if TT slots have changed
         if (slotsDiff != 0) {
-          qsi.guaranteedCapacity +=
-            (qsi.guaranteedCapacityPercent*slotsDiff/100);
+          qsi.guaranteedCapacity =
+            (int)(qsi.guaranteedCapacityPercent*numSlots/100);
         }
         qsi.numRunningTasks = 0;
         qsi.numPendingTasks = 0;
@@ -729,6 +747,13 @@
        */
       updateCollectionOfQSIs();
       for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+        if (qsi.guaranteedCapacity <= 0.0f) {
+          // No capacity is guaranteed yet for this queue.
+          // Queues are sorted so that ones without capacities
+          // come towards the end. Hence, we can simply return
+          // from here without considering any further queues.
+          return null;
+        }
         t = getTaskFromQueue(taskTracker, qsi);
         if (t!= null) {
           // we have a task. Update reclaimed resource info

Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 15 13:17:45 2008
@@ -539,6 +539,62 @@
     assertEquals(1, scheduler.jobQueuesManager.
                         getWaitingJobQueue("default").size());
   }
+
+  // Tests how GC is computed and assignment of tasks done
+  // on the basis of the GC.
+  public void testCapacityBasedAllocation() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    // set the gc % as 10%, so that gc will be zero initially as 
+    // the cluster capacity increase slowly.
+    queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+   
+    // submit a job to the default queue
+    submitJob(JobStatus.PREP, 10, 0, "default", "u1");
+    
+    // submit a job to the second queue
+    submitJob(JobStatus.PREP, 10, 0, "q2", "u1");
+    
+    // job from q2 runs first because it has some non-zero capacity.
+    checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("3", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt3");
+    checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("5", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt4");
+    checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+    verifyGuaranteedCapacity("0", "default");
+    verifyGuaranteedCapacity("7", "q2");
+    
+    // add another tt to increase tt slots
+    taskTrackerManager.addTaskTracker("tt5");
+    // now job from default should run, as it is furthest away
+    // in terms of runningMaps / gc.
+    checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
+    verifyGuaranteedCapacity("1", "default");
+    verifyGuaranteedCapacity("9", "q2");
+  }
+  
+  private void verifyGuaranteedCapacity(String expectedCapacity, 
+                                          String queue) throws IOException {
+    String schedInfo = taskTrackerManager.getQueueManager().
+                          getSchedulerInfo(queue).toString();
+    assertTrue(schedInfo.contains("Current Capacity Maps : " 
+                                    + expectedCapacity));
+  }
   
   // test capacity transfer
   public void testCapacityTransfer() throws Exception {