You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:36:32 UTC

svn commit: r1077614 - in /hadoop/common/branches/branch-0.20-security-patches/src: contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ mapred/org/apache/hadoop/mapred/ test/org/ap...

Author: omalley
Date: Fri Mar  4 04:36:32 2011
New Revision: 1077614

URL: http://svn.apache.org/viewvc?rev=1077614&view=rev
Log:
commit ecc38f53c5b63427cd07a731433372785428755f
Author: Arun C Murthy <ac...@apache.org>
Date:   Wed Jul 28 15:48:12 2010 -0700

    MAPREDUCE-1872. Fix CapacityTaskScheduler to ensure task limits are checked before queue limits on job submission. Also, fixed the scheduler to allow for jobs in queues with miniscule capacities to make progress.

Modified:
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
    hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Fri Mar  4 04:36:32 2011
@@ -324,7 +324,7 @@ class CapacitySchedulerQueue {
     this.maxJobsToAccept = maxJobsToAccept;
     this.maxJobsPerUserToAccept = maxJobsPerUserToAccept;
     
-    LOG.info("Initialized '" + queueName + "' queue with " +
+    LOG.info("Initializing '" + queueName + "' queue with " +
         "cap=" + capacityPercent + ", " +
         "maxCap=" + maxCapacityPercent + ", " +
         "ulMin=" + ulMin + ", " +
@@ -337,6 +337,14 @@ class CapacitySchedulerQueue {
         "maxJobsPerUserToAccept=" + maxJobsPerUserToAccept + ", " +
         "maxActiveTasksPerUser=" + maxActiveTasksPerUser
     );
+    
+    // Sanity checks
+    if (maxActiveTasks < maxActiveTasksPerUser ||
+        maxJobsToInit < maxJobsPerUserToInit || 
+        maxJobsToAccept < maxJobsPerUserToAccept) {
+      throw new IllegalArgumentException("Illegal queue configuration for " +
+      		"queue '" + queueName + "'");
+    }
   }
   
   synchronized void initializeQueue(CapacitySchedulerQueue other) {
@@ -871,23 +879,31 @@ class CapacitySchedulerQueue {
    * the requested number of slots, <code>false</code> otherwise
    */
   boolean assignSlotsToJob(TaskType taskType, JobInProgress job, String user) {
+    int numSlotsRequested = job.getNumSlotsPerTask(taskType);
+    
     // Check to ensure we will not go over the queue's max-capacity
-    if (!assignSlotsToQueue(taskType, job.getNumSlotsPerTask(taskType))) {
+    if (!assignSlotsToQueue(taskType, numSlotsRequested)) {
       return false;
     }
     
-    // what is our current capacity? It is equal to the queue-capacity if
-    // we're running below capacity. If we're running over capacity, then its
-    // #running plus slotPerTask of the job (which is the number of extra
-    // slots we're getting).
+    // What is our current capacity? 
+    // * It is equal to the max(numSlotsRequested queue-capacity) if
+    //   we're running below capacity. The 'max' ensures that jobs in queues
+    //   with miniscule capacity (< 1 slot) make progress
+    // * If we're running over capacity, then its
+    //   #running plus slotPerTask of the job (which is the number of extra
+    //   slots we're getting).
+    
+    // Allow progress for queues with miniscule capacity
+    int queueCapacity = Math.max(getCapacity(taskType), numSlotsRequested);
+    
+    int queueSlotsOccupied = getNumSlotsOccupied(taskType);
     int currentCapacity;
-    int queueCapacity = getCapacity(taskType);
-    if (getNumSlotsOccupied(taskType) < queueCapacity) {
+    if (queueSlotsOccupied < queueCapacity) {
       currentCapacity = queueCapacity;
     }
     else {
-      currentCapacity = 
-        getNumSlotsOccupied(taskType) + job.getNumSlotsPerTask(taskType);
+      currentCapacity = queueSlotsOccupied + numSlotsRequested;
     }
     
     // Never allow a single user to take more than the 
@@ -900,11 +916,14 @@ class CapacitySchedulerQueue {
                    divideAndCeil(ulMin*currentCapacity, 100)),
           (int)(queueCapacity * ulMinFactor)
           );
-    if (getNumSlotsOccupiedByUser(user, taskType) >= limit) {
+
+    if ((getNumSlotsOccupiedByUser(user, taskType) + numSlotsRequested) > 
+        limit) {
       if (LOG.isDebugEnabled()) {
-        LOG.debug("User " + user + " is over limit, num slots occupied=" + 
-            getNumSlotsOccupiedByUser(user, taskType) + 
-            ", limit=" + limit);
+        LOG.debug("User " + user + " is over limit for queue=" + queueName + 
+            " num slots occupied=" + getNumSlotsOccupiedByUser(user, taskType) + 
+            " limit=" + limit +" numSlotsRequested=" + numSlotsRequested + 
+            " currentCapacity=" + currentCapacity);
       }
       return false;
     }
@@ -952,6 +971,17 @@ class CapacitySchedulerQueue {
           "");
     }
     
+    // Task limits - No point accepting the job if it can never be initialized
+    if (job.desiredTasks() > maxActiveTasksPerUser) {
+      throw new IOException(
+          "Job '" + job.getJobID() + "' from user '" + user  +
+          "' rejected since it has " + job.desiredTasks() + " tasks which" +
+          " exceeds the limit of " + maxActiveTasksPerUser + 
+          " tasks per-user which can be initialized for queue '" + 
+          queueName + "'"
+          );
+    }
+    
     // Across all jobs in queue
     if ((getNumWaitingJobs() + getNumRunningJobs()) >= maxJobsToAccept) {
       throw new IOException(

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar  4 04:36:32 2011
@@ -410,12 +410,6 @@ class CapacityTaskScheduler extends Task
       
       
       for (CapacitySchedulerQueue queue : queuesForAssigningTasks) {
-        // we may have queues with capacity=0. We shouldn't look at jobs from 
-        // these queues
-        if (0 == queue.getCapacity(TaskType.MAP)) {
-          continue;
-        }
-
         //This call is for optimization if we are already over the
         //maximum-capacity we avoid traversing the queues.
         if (!queue.assignSlotsToQueue(type, 1)) {

Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar  4 04:36:32 2011
@@ -168,7 +168,7 @@ public class TestCapacityScheduler exten
     private int speculativeReduceTaskCounter = 0;
     public FakeJobInProgress(JobID jId, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user, 
-        JobTracker jt) {
+        JobTracker jt) throws IOException {
       super(jId, jobConf, jt);
       if (user == null) {
         user = "drwho";
@@ -327,7 +327,7 @@ public class TestCapacityScheduler exten
 
     public FakeFailingJobInProgress(JobID id, JobConf jobConf,
         FakeTaskTrackerManager taskTrackerManager, String user, 
-        JobTracker jt) {
+        JobTracker jt) throws IOException {
       super(id, jobConf, taskTrackerManager, user, jt);
     }
     
@@ -1364,9 +1364,10 @@ public class TestCapacityScheduler exten
 
 
     //high ram map from job 1 and normal reduce task from job 1
-    List<Task> tasks = checkMultipleAssignment(
-      "tt1", "attempt_test_0001_m_000001_0 on tt1",
-      "attempt_test_0001_r_000001_0 on tt1");
+    List<Task> tasks = checkAssignments("tt1", 
+        new String[] {
+        "attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
 
     checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
     checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
@@ -2958,8 +2959,8 @@ public class TestCapacityScheduler exten
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(0);
     jConf.setQueueName("default");
-    jConf.setUser("u1");
-    FakeJobInProgress job4= submitJobAndInit(JobStatus.PREP, jConf);
+    jConf.setUser("u2");
+    FakeJobInProgress job4 = submitJobAndInit(JobStatus.PREP, jConf);
 
     LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
         + "2 map/red tasks");
@@ -2969,7 +2970,7 @@ public class TestCapacityScheduler exten
     jConf.setNumMapTasks(2);
     jConf.setNumReduceTasks(2);
     jConf.setQueueName("default");
-    jConf.setUser("u2");
+    jConf.setUser("u3");
     FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
 
     // Job4, a high memory job cannot be accommodated on a any TT. But with each

Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar  4 04:36:32 2011
@@ -319,7 +319,8 @@ public class JobInProgress {
   /**
    * Create an almost empty JobInProgress, which can be used only for tests
    */
-  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) {
+  protected JobInProgress(JobID jobid, JobConf conf, JobTracker tracker) 
+  throws IOException {
     this.conf = conf;
     this.jobId = jobid;
     this.numMapTasks = conf.getNumMapTasks();
@@ -347,6 +348,9 @@ public class JobInProgress {
     this.mapFailuresPercent = conf.getMaxMapTaskFailuresPercent();
     this.reduceFailuresPercent = conf.getMaxReduceTaskFailuresPercent();
 
+    // Check task limits
+    checkTaskLimits();
+
     this.taskCompletionEvents = new ArrayList<TaskCompletionEvent>
       (numMapTasks + numReduceTasks + 10);
     try {
@@ -387,6 +391,7 @@ public class JobInProgress {
         public FileSystem run() throws IOException {
           return jobSubmitDir.getFileSystem(default_conf);
         }});
+      
       /** check for the size of jobconf **/
       Path submitJobFile = JobSubmissionFiles.getJobConfPath(jobSubmitDir);
       FileStatus fstatus = fs.getFileStatus(submitJobFile);
@@ -411,6 +416,7 @@ public class JobInProgress {
             jobId.toString(), desc);
         throw new IOException(desc);
       }
+      
       this.priority = conf.getJobPriority();
       this.status.setJobPriority(this.priority);
       this.profile = new JobProfile(user, jobId, 
@@ -458,6 +464,9 @@ public class JobInProgress {
       // register job's tokens for renewal
       DelegationTokenRenewal.registerDelegationTokensForRenewal(
           jobInfo.getJobID(), ts, jobtracker.getConf());
+      
+      // Check task limits
+      checkTaskLimits();
     } finally {
       //close all FileSystems that was created above for the current user
       //At this point, this constructor is called in the context of an RPC, and
@@ -467,6 +476,19 @@ public class JobInProgress {
     }
   }
     
+  private void checkTaskLimits() throws IOException {
+    // if the number of tasks is larger than a configured value
+    // then fail the job.
+    int maxTasks = jobtracker.getMaxTasksPerJob();
+    LOG.info(jobId + ": nMaps=" + numMapTasks + " nReduces=" + numReduceTasks + " max=" + maxTasks);
+    if (maxTasks > 0 && (numMapTasks + numReduceTasks) > maxTasks) {
+      throw new IOException(
+                "The number of tasks for this job " + 
+                (numMapTasks + numReduceTasks) +
+                " exceeds the configured limit " + maxTasks);
+    }
+  }
+  
   /**
    * Called when the job is complete
    */
@@ -649,18 +671,13 @@ public class JobInProgress {
     // read input splits and create a map per a split
     //
     TaskSplitMetaInfo[] splits = createSplits(jobId);
+    if (numMapTasks != splits.length) {
+      throw new IOException("Number of maps in JobConf doesn't match number of " +
+      		"recieved splits for job " + jobId + "! " +
+      		"numMapTasks=" + numMapTasks + ", #splits=" + splits.length);
+    }
     numMapTasks = splits.length;
 
-
-    // if the number of splits is larger than a configured value
-    // then fail the job.
-    int maxTasks = jobtracker.getMaxTasksPerJob();
-    if (maxTasks > 0 && numMapTasks + numReduceTasks > maxTasks) {
-      throw new IOException(
-                "The number of tasks for this job " + 
-                (numMapTasks + numReduceTasks) +
-                " exceeds the configured limit " + maxTasks);
-    }
     jobtracker.getInstrumentation().addWaitingMaps(getJobID(), numMapTasks);
     jobtracker.getInstrumentation().addWaitingReduces(getJobID(), numReduceTasks);
 

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestClusterStatus.java Fri Mar  4 04:36:32 2011
@@ -128,7 +128,7 @@ public class TestClusterStatus extends T
    */
   static class FakeJobInProgress extends JobInProgress {
     public FakeJobInProgress(JobID jId, JobConf jobConf,
-                JobTracker jt) {
+                JobTracker jt) throws IOException {
       super(jId, jobConf, jt);
     }
   }

Modified: hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java?rev=1077614&r1=1077613&r2=1077614&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/test/org/apache/hadoop/mapred/TestJobRetire.java Fri Mar  4 04:36:32 2011
@@ -268,7 +268,8 @@ public class TestJobRetire extends TestC
   }
  
   // create a new job and add it to the jobtracker
-  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) {
+  private JobInProgress createAndAddJob(JobTracker jobtracker, JobConf conf) 
+  throws IOException {
     // submit a job in a fake manner
     // get the new job-id
     JobID id = 
@@ -333,7 +334,7 @@ public class TestJobRetire extends TestC
   //   - remove the job from the jobtracker
   //   - check if the fake attempt is removed from the jobtracker
   private void testRemoveJobTasks(JobTracker jobtracker, JobConf conf, 
-                                  TaskType type) {
+                                  TaskType type) throws IOException {
     // create and submit a job
     JobInProgress jip = createAndAddJob(jobtracker, conf);
     // create and add a tip