You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/19 04:49:01 UTC

svn commit: r727910 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: yhemanth
Date: Thu Dec 18 19:49:01 2008
New Revision: 727910

URL: http://svn.apache.org/viewvc?rev=727910&view=rev
Log:
HADOOP-4876. Fix capacity scheduler reclamation by updating count of pending tasks correctly. Contributed by Sreekanth Ramakrishnan

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 18 19:49:01 2008
@@ -475,6 +475,9 @@
     items which would otherwise not work on a fresh checkout.
     (Sreekanth Ramakrishnan via yhemanth)
 
+    HADOOP-4876. Fix capacity scheduler reclamation by updating count of
+    pending tasks correctly. (Sreekanth Ramakrishnan via yhemanth)
+
 Release 0.19.1 - Unreleased
 
   IMPROVEMENTS

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec 18 19:49:01 2008
@@ -553,6 +553,10 @@
      * to make scheduling decisions. For example, we don't need an exact count
      * of numRunningTasks. Once we count upto the grid capacity (gcSum), any
      * number beyond that will make no difference.
+     * 
+     * The pending task count is only required in reclaim capacity. So 
+     * if the computation becomes expensive, we can add a boolean to 
+     * denote if pending task computation is required or not.
      * */
     private synchronized void updateQSIObjects() {
       // if # of slots have changed since last time, update. 
@@ -597,7 +601,29 @@
            * consider the first few jobs per user.
            */ 
         }
-        //TODO do we need to update stats on waiting jobs
+        
+        //update stats on waiting jobs
+        for(JobInProgress j : 
+          scheduler.jobQueuesManager.getJobs(qsi.queueName)) {
+          // pending tasks
+          if(qsi.numPendingTasks > getClusterCapacity()) {
+            // that's plenty. no need for more computation
+            break;
+          }
+          /*
+           * Consider only the waiting jobs in the job queue. Job queue can
+           * contain:
+           * 1. Jobs which are in running state but not scheduled
+           * (these would also be present in running queue), the pending 
+           * task count of these jobs is computed when scheduler walks
+           * through running job queue.
+           * 2. Jobs which are killed by user, but waiting job initialization
+           * poller to walk through the job queue to clean up killed jobs.
+           */
+          if (j.getStatus().getRunState() == JobStatus.PREP) {
+            qsi.numPendingTasks += getPendingTasks(j);
+          }
+        }
       }
     }
 

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec 18 19:49:01 2008
@@ -1339,6 +1339,68 @@
     assertEquals(j1.runningMapTasks, 2);
     
   }
+  
+  /*
+   * Test case for checking the reclaim capacity with uninitalized jobs.
+   * 
+   * Configure 2 queue with capacity scheduler.
+   * 
+   * Submit a single job to the default queue and make it go above the gc
+   * of the queue.
+   * 
+   * Then submit another job to the second queue but don't initialize it.
+   * 
+   * Run reclaim capacity thread for the scheduler, in order to let scheduler
+   * know that it has to reclaim capacity.
+   * 
+   * Advance the scheduler clock by appropriate milliseconds.
+   * 
+   * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
+   * 
+   * Check running task count of the running job.
+   * 
+   */
+  public void testReclaimCapacityWithUninitializedJobs() throws IOException {
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    resConf = new FakeResourceManagerConf();
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+    queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
+    resConf.setFakeQueues(queues);
+    
+    ControlledInitializationPoller p = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(p);
+    
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+    //Submit one job to the default queue and get the capacity over the 
+    //gc of the particular queue.
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+    checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+    
+    //Submit another job to the second queue but not initialize it.
+    submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+    
+    //call scheduler's reclaim capacity in order to start reclaim capacity
+    //process.
+    scheduler.reclaimCapacity();
+    //advance the clock to the position when the two task of the job would
+    //be killed.
+    clock.advance(600000);
+    //run reclaim capacity
+    scheduler.reclaimCapacity();
+    //check the count of the running tasks.
+    assertEquals(j1.runningMapTasks, 2);
+    
+  }
+  
   /*
    * Following is the testing strategy for testing scheduling information.
    * - start capacity scheduler with two queues.