You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/04 14:58:23 UTC

svn commit: r723326 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: yhemanth
Date: Thu Dec  4 05:58:22 2008
New Revision: 723326

URL: http://svn.apache.org/viewvc?rev=723326&view=rev
Log:
HADOOP-4558. Fix capacity reclamation in capacity scheduler. Contributed by Amar Kamat.

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec  4 05:58:22 2008
@@ -256,6 +256,9 @@
     HADOOP-4732. Pass connection and read timeouts in the correct order when
     setting up fetch in reduce. (Amareshwari Sriramadasu via cdouglas)
 
+    HADOOP-4558. Fix capacity reclamation in capacity scheduler.
+    (Amar Kamat via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec  4 05:58:22 2008
@@ -378,6 +378,11 @@
       if (queueInfoMap.size() < 2) {
         return;
       }
+      
+      // make sure we always get the latest values
+      updateQSIObjects();
+      updateCollectionOfQSIs();
+      
       QueueSchedulingInfo lastQsi = 
         qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
       long currentTime = scheduler.clock.getTime();
@@ -554,7 +559,7 @@
           qsi.numRunningTasksByUser.put(j.getProfile().getUser(), 
               i+getRunningTasks(j));
           qsi.numPendingTasks += getPendingTasks(j);
-          LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " + 
+          LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
               j.runningMaps() + ", run(r) = " + j.runningReduces() + 
               ", finished(m) = " + j.finishedMaps() + ", finished(r)= " + 
               j.finishedReduces() + ", failed(m) = " + j.failedMapTasks + 
@@ -1134,7 +1139,7 @@
         totalCapacity += gc;
       }
       int ulMin = rmConf.getMinimumUserLimitPercent(queueName); 
-      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+      long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName) * 1000;
       // reclaimTimeLimit is the time(in millisec) within which we need to
       // reclaim capacity. 
       // create queue scheduling objects for Map and Reduce

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec  4 05:58:22 2008
@@ -1168,9 +1168,9 @@
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1185,8 +1185,6 @@
     checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
     // now submit a job to q2
     FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1211,10 +1209,10 @@
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1242,8 +1240,6 @@
     // running 3 tasks (with a cap of 1). 
     // If we submit a job to 'default', we need to get 3 slots back. 
     FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1262,6 +1258,42 @@
     
   }
 
+  // test code to reclaim capacity when the cluster is completely occupied
+  public void testReclaimCapacityWithFullCluster() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "queue"};
+    taskTrackerManager.addQueues(qs);
+    int maxSlots = taskTrackerManager.maxMapTasksPerTracker 
+                   * taskTrackerManager.taskTrackers().size();
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    
+    // now submit 1 job to queue "default" which should take up the cluster
+    FakeJobInProgress j1 = 
+      submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+    checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    
+    // now submit a job to queue "queue"
+    submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
+    
+    scheduler.reclaimCapacity();
+    
+    clock.advance(scheduler.rmConf.getReclaimTimeLimit("default") * 1000);
+    
+    scheduler.reclaimCapacity();
+    
+    // check if the tasks are killed 
+    assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
+  }
+  
   // test code to reclaim capacity in steps
   public void testReclaimCapacityInSteps() throws Exception {
     // set up some queues
@@ -1269,8 +1301,8 @@
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
@@ -1284,8 +1316,6 @@
     checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
     // now submit a job to q2
     FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
-    // update our structures
-    scheduler.updateQSIInfo();
     // get scheduler to notice that q2 needs to reclaim
     scheduler.reclaimCapacity();
     // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so 
@@ -1293,8 +1323,6 @@
     clock.advance(400000);
     // submit another job to q2 which causes more capacity to be reclaimed
     j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
-    // update our structures
-    scheduler.updateQSIInfo();
     clock.advance(200000);
     scheduler.reclaimCapacity();
     // one task from j1 will be killed
@@ -1315,8 +1343,8 @@
     taskTrackerManager.addQueues(qs);
     resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();