You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/08/08 22:08:23 UTC

svn commit: r1155075 - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/

Author: acmurthy
Date: Mon Aug  8 20:08:22 2011
New Revision: 1155075

URL: http://svn.apache.org/viewvc?rev=1155075&view=rev
Log:
MAPREDUCE-2729. Ensure jobs with reduces which can't be launched due to slow-start do not count for user-limits. Contributed by Sherry Chen.

Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
    hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon Aug  8 20:08:22 2011
@@ -9,6 +9,9 @@ Release 0.20.205.0 - unreleased
 
   BUG FIXES
 
+    MAPREDUCE-2729. Ensure jobs with reduces which can't be launched due to
+    slow-start do not count for user-limits. (Sherry Chen via acmurthy) 
+
     MAPREDUCE-2651. Fix race condition in Linux task controller for
     job log directory creation. (Bharath Mundlapudi via llu)
 

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Mon Aug  8 20:08:22 2011
@@ -197,13 +197,13 @@ class CapacitySchedulerQueue {
       }
     }
     
-    void updateSlotsUsage(String user, int pendingTasks, int numRunningTasks, int numSlotsOccupied) {
+    void updateSlotsUsage(String user, boolean pendingTasks, int numRunningTasks, int numSlotsOccupied) {
       this.numRunningTasks += numRunningTasks;
       this.numSlotsOccupied += numSlotsOccupied;
       Integer i = this.numSlotsOccupiedByUser.get(user);
       int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
       this.numSlotsOccupiedByUser.put(user, slots);
-      if (pendingTasks > 0) {
+      if (pendingTasks) {
         users.add(user);
       }
     }
@@ -581,12 +581,25 @@ class CapacitySchedulerQueue {
    */
   void update(TaskType type, JobInProgress job, String user, 
       int numRunningTasks, int numSlotsOccupied) {
+    // pendingTasks keeps tracking of whether a user's job has tasks that
+    // still need to be scheduled. The number of users with pending tasks is
+    // used in the limit calculations.
+    boolean pendingTasks = false;
     if (type == TaskType.MAP) {
-      mapSlots.updateSlotsUsage(user, job.pendingMaps(), 
+      // A job has map tasks to be scheduled when job.pendingMaps > 0
+      if (job.pendingMaps() > 0) {
+        pendingTasks = true;
+      }
+      mapSlots.updateSlotsUsage(user, pendingTasks,
           numRunningTasks, numSlotsOccupied);
     } else if (type == TaskType.REDUCE) {
-      reduceSlots.updateSlotsUsage(user, job.pendingReduces(), 
-          numRunningTasks, numSlotsOccupied);
+      // A job has reduce tasks to be scheduled when job.pendingReduces() > 0 &&
+      // the minimum number of maps have been completed
+      if (job.scheduleReduces() && (job.pendingReduces() > 0)) {
+        pendingTasks = true;
+      }
+      reduceSlots.updateSlotsUsage(user, pendingTasks,
+        numRunningTasks, numSlotsOccupied);
     }
   }
   

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon Aug  8 20:08:22 2011
@@ -252,6 +252,9 @@ public class TestCapacityScheduler exten
     @Override
     public Task obtainNewReduceTask(final TaskTrackerStatus tts,
         int clusterSize, int ignored) throws IOException {
+      if (!scheduleReduces()) {
+        return null;
+      }
       boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
       if (areAllReducesRunning){
         if(!getJobConf().getReduceSpeculativeExecution() || 
@@ -3424,6 +3427,160 @@ public class TestCapacityScheduler exten
     assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
   }
 
+  // test that 1st user gets reduce slots when 2nd user haven't finished 
+  // enough map tasks yet
+  public void testUserLimit() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
+    queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
+    resConf.setFakeQueues(queues);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // add some more TTs 
+    taskTrackerManager.addTaskTracker("tt3");
+    taskTrackerManager.addTaskTracker("tt4");
+    taskTrackerManager.addTaskTracker("tt5");
+
+    // submit a job
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+    // for queue 'default', the capacity for maps is 4. Since we're the only user,
+    // we should get 2 map tasks & 1 reduce
+    checkAssignments("tt1",
+        new String[] {"attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_m_000002_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
+
+    // Submit another job, from a different user
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
+    // FakeJobInProgress uses override initTasks() to init tasks, which does
+    // not initialize completedMapsForReduceSlowstart, and there is no proper
+    // API to set completedMapsForReduceSlowstart neither.
+    // We manually set completedMapsForReduceSlowstart here to accommadate this
+    // test, and to avoid changing mapredcuce code for testing purpose only
+    j2.completedMapsForReduceSlowstart = 3;
+
+    // Now if I ask for a map task, it should come from the second job
+    // reduce task will be from j1 since j2 hasn't completed enough map task
+    checkAssignments("tt2",
+        new String[] {"attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_m_000002_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2"});
+
+    // Now if I ask for map tasks again, 1 from j2, 1 from job1
+    // no reduce task since j1 used up queue capacity & j2 hasn't completed
+    // enough map tasks yet
+    checkAssignments("tt3",
+        new String[] {"attempt_test_0001_m_000003_0 on tt3",
+        "attempt_test_0002_m_000003_0 on tt3"});
+
+    // Now if I ask for map tasks again, 1 from j2, 1 from j1,
+    // no reduce task still
+    checkAssignments("tt4",
+        new String[] {"attempt_test_0001_m_000004_0 on tt4",
+        "attempt_test_0002_m_000004_0 on tt4"});
+
+    // complete 3 tasks from j2
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
+    taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
+
+    // Now if I ask for map tasks again, 2 from j2,
+    // for reduce tasks, 1 from j2
+    checkAssignments("tt5",
+        new String[] {"attempt_test_0002_m_000005_0 on tt5",
+        "attempt_test_0002_m_000006_0 on tt5",
+        "attempt_test_0002_r_000001_0 on tt5"});
+
+    // complete 2 tasks from j1
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
+   
+    // Now if I ask for tasks again, 1 map & 1 reduce from j1
+    checkAssignments("tt1",
+        new String[] {"attempt_test_0001_m_000005_0 on tt1",
+        "attempt_test_0001_r_000003_0 on tt1"});
+  }
+
+  // test that 1st user gets reduce slots when 2nd user haven't finished 
+  // enough map tasks yet, witout exceeding MaxCapacity
+  public void testUserLimitWithMaxCapacity() throws Exception {
+    // set up some queues
+    String[] qs = {"default", "q2"};
+    taskTrackerManager.addQueues(qs);
+    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+    queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
+    queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
+    resConf.setFakeQueues(queues);
+    resConf.setMaxCapacity("default", 60.0f);
+    scheduler.setResourceManagerConf(resConf);
+    scheduler.start();
+
+    // add some more TTs 
+    taskTrackerManager.addTaskTracker("tt3");
+    taskTrackerManager.addTaskTracker("tt4");
+    taskTrackerManager.addTaskTracker("tt5");
+
+    // submit a job
+    FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+    // for queue 'default', the capacity for maps is 4. Since we're the only user,
+    // we should get 2 map tasks & 1 reduce
+    checkAssignments("tt1",
+        new String[] {"attempt_test_0001_m_000001_0 on tt1",
+        "attempt_test_0001_m_000002_0 on tt1",
+        "attempt_test_0001_r_000001_0 on tt1"});
+
+    // Submit another job, from a different user
+    FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
+    // FakeJobInProgress uses override initTasks() to init tasks, which does
+    // not initialize completedMapsForReduceSlowstart, and there is no proper
+    // API to set completedMapsForReduceSlowstart neither.
+    // We manually set completedMapsForReduceSlowstart here to accommadate this
+    // test, and to avoid changing mapredcuce code for testing purpose only
+    j2.completedMapsForReduceSlowstart = 3;
+
+    // Now if I ask for a map task, it should come from the second job
+    // reduce task will be from j1 since j2 hasn't completed enough map task
+    checkAssignments("tt2",
+        new String[] {"attempt_test_0002_m_000001_0 on tt2",
+        "attempt_test_0002_m_000002_0 on tt2",
+        "attempt_test_0001_r_000002_0 on tt2"});
+
+    // Now if I ask for map tasks again, 1 from j2, 1 from job1
+    // no reduce task since j1 used up queue capacity & j2 hasn't completed
+    // enough map tasks yet
+    checkAssignments("tt3",
+        new String[] {"attempt_test_0001_m_000003_0 on tt3",
+        "attempt_test_0002_m_000003_0 on tt3"});
+
+    // Now if I ask for map tasks again, no tasks since it reached maxcapacity
+    checkAssignments("tt4", new String[] {});
+
+    // complete 3 tasks from j2
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+    taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
+    taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
+
+    // Now if I ask for map tasks again, 2 from j2,
+    // for reduce tasks, 1 from j2
+    checkAssignments("tt5",
+        new String[] {"attempt_test_0002_m_000004_0 on tt5",
+        "attempt_test_0002_m_000005_0 on tt5",
+        "attempt_test_0002_r_000001_0 on tt5"});
+
+    // complete 2 tasks from j1
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+    taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
+
+    // Now if I ask for tasks again, 1 map & 1 reduce from j1
+    checkAssignments("tt1",
+        new String[] {"attempt_test_0001_m_000004_0 on tt1",
+        "attempt_test_0001_r_000003_0 on tt1"});
+  }
+
   /**
    * Checks for multiple assignment.
    *