You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/08/08 22:08:23 UTC
svn commit: r1155075 - in /hadoop/common/branches/branch-0.20-security: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: acmurthy
Date: Mon Aug 8 20:08:22 2011
New Revision: 1155075
URL: http://svn.apache.org/viewvc?rev=1155075&view=rev
Log:
MAPREDUCE-2729. Ensure jobs with reduces which can't be launched due to slow-start do not count for user-limits. Contributed by Sherry Chen.
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Mon Aug 8 20:08:22 2011
@@ -9,6 +9,9 @@ Release 0.20.205.0 - unreleased
BUG FIXES
+ MAPREDUCE-2729. Ensure jobs with reduces which can't be launched due to
+ slow-start do not count for user-limits. (Sherry Chen via acmurthy)
+
MAPREDUCE-2651. Fix race condition in Linux task controller for
job log directory creation. (Bharath Mundlapudi via llu)
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerQueue.java Mon Aug 8 20:08:22 2011
@@ -197,13 +197,13 @@ class CapacitySchedulerQueue {
}
}
- void updateSlotsUsage(String user, int pendingTasks, int numRunningTasks, int numSlotsOccupied) {
+ void updateSlotsUsage(String user, boolean pendingTasks, int numRunningTasks, int numSlotsOccupied) {
this.numRunningTasks += numRunningTasks;
this.numSlotsOccupied += numSlotsOccupied;
Integer i = this.numSlotsOccupiedByUser.get(user);
int slots = numSlotsOccupied + ((i == null) ? 0 : i.intValue());
this.numSlotsOccupiedByUser.put(user, slots);
- if (pendingTasks > 0) {
+ if (pendingTasks) {
users.add(user);
}
}
@@ -581,12 +581,25 @@ class CapacitySchedulerQueue {
*/
void update(TaskType type, JobInProgress job, String user,
int numRunningTasks, int numSlotsOccupied) {
+ // pendingTasks keeps tracking of whether a user's job has tasks that
+ // still need to be scheduled. The number of users with pending tasks is
+ // used in the limit calculations.
+ boolean pendingTasks = false;
if (type == TaskType.MAP) {
- mapSlots.updateSlotsUsage(user, job.pendingMaps(),
+ // A job has map tasks to be scheduled when job.pendingMaps > 0
+ if (job.pendingMaps() > 0) {
+ pendingTasks = true;
+ }
+ mapSlots.updateSlotsUsage(user, pendingTasks,
numRunningTasks, numSlotsOccupied);
} else if (type == TaskType.REDUCE) {
- reduceSlots.updateSlotsUsage(user, job.pendingReduces(),
- numRunningTasks, numSlotsOccupied);
+ // A job has reduce tasks to be scheduled when job.pendingReduces() > 0 &&
+ // the minimum number of maps have been completed
+ if (job.scheduleReduces() && (job.pendingReduces() > 0)) {
+ pendingTasks = true;
+ }
+ reduceSlots.updateSlotsUsage(user, pendingTasks,
+ numRunningTasks, numSlotsOccupied);
}
}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1155075&r1=1155074&r2=1155075&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Mon Aug 8 20:08:22 2011
@@ -252,6 +252,9 @@ public class TestCapacityScheduler exten
@Override
public Task obtainNewReduceTask(final TaskTrackerStatus tts,
int clusterSize, int ignored) throws IOException {
+ if (!scheduleReduces()) {
+ return null;
+ }
boolean areAllReducesRunning = (redTaskCtr == numReduceTasks);
if (areAllReducesRunning){
if(!getJobConf().getReduceSpeculativeExecution() ||
@@ -3424,6 +3427,160 @@ public class TestCapacityScheduler exten
assertEquals(scheduler.getLimitMaxMemForReduceSlot(),3);
}
+ // test that 1st user gets reduce slots when 2nd user haven't finished
+ // enough map tasks yet
+ public void testUserLimit() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
+ queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // add some more TTs
+ taskTrackerManager.addTaskTracker("tt3");
+ taskTrackerManager.addTaskTracker("tt4");
+ taskTrackerManager.addTaskTracker("tt5");
+
+ // submit a job
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+ // for queue 'default', the capacity for maps is 4. Since we're the only user,
+ // we should get 2 map tasks & 1 reduce
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
+ // Submit another job, from a different user
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
+ // FakeJobInProgress uses override initTasks() to init tasks, which does
+ // not initialize completedMapsForReduceSlowstart, and there is no proper
+ // API to set completedMapsForReduceSlowstart neither.
+ // We manually set completedMapsForReduceSlowstart here to accommadate this
+ // test, and to avoid changing mapredcuce code for testing purpose only
+ j2.completedMapsForReduceSlowstart = 3;
+
+ // Now if I ask for a map task, it should come from the second job
+ // reduce task will be from j1 since j2 hasn't completed enough map task
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"});
+
+ // Now if I ask for map tasks again, 1 from j2, 1 from job1
+ // no reduce task since j1 used up queue capacity & j2 hasn't completed
+ // enough map tasks yet
+ checkAssignments("tt3",
+ new String[] {"attempt_test_0001_m_000003_0 on tt3",
+ "attempt_test_0002_m_000003_0 on tt3"});
+
+ // Now if I ask for map tasks again, 1 from j2, 1 from j1,
+ // no reduce task still
+ checkAssignments("tt4",
+ new String[] {"attempt_test_0001_m_000004_0 on tt4",
+ "attempt_test_0002_m_000004_0 on tt4"});
+
+ // complete 3 tasks from j2
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
+ taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
+
+ // Now if I ask for map tasks again, 2 from j2,
+ // for reduce tasks, 1 from j2
+ checkAssignments("tt5",
+ new String[] {"attempt_test_0002_m_000005_0 on tt5",
+ "attempt_test_0002_m_000006_0 on tt5",
+ "attempt_test_0002_r_000001_0 on tt5"});
+
+ // complete 2 tasks from j1
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
+
+ // Now if I ask for tasks again, 1 map & 1 reduce from j1
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000005_0 on tt1",
+ "attempt_test_0001_r_000003_0 on tt1"});
+ }
+
+ // test that 1st user gets reduce slots when 2nd user haven't finished
+ // enough map tasks yet, witout exceeding MaxCapacity
+ public void testUserLimitWithMaxCapacity() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 40.0f, true, 50));
+ queues.add(new FakeQueueInfo("q2", 60.0f, false, 50));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxCapacity("default", 60.0f);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // add some more TTs
+ taskTrackerManager.addTaskTracker("tt3");
+ taskTrackerManager.addTaskTracker("tt4");
+ taskTrackerManager.addTaskTracker("tt5");
+
+ // submit a job
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+ // for queue 'default', the capacity for maps is 4. Since we're the only user,
+ // we should get 2 map tasks & 1 reduce
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
+ // Submit another job, from a different user
+ FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 6, 1, "default", "u2");
+ // FakeJobInProgress uses override initTasks() to init tasks, which does
+ // not initialize completedMapsForReduceSlowstart, and there is no proper
+ // API to set completedMapsForReduceSlowstart neither.
+ // We manually set completedMapsForReduceSlowstart here to accommadate this
+ // test, and to avoid changing mapredcuce code for testing purpose only
+ j2.completedMapsForReduceSlowstart = 3;
+
+ // Now if I ask for a map task, it should come from the second job
+ // reduce task will be from j1 since j2 hasn't completed enough map task
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"});
+
+ // Now if I ask for map tasks again, 1 from j2, 1 from job1
+ // no reduce task since j1 used up queue capacity & j2 hasn't completed
+ // enough map tasks yet
+ checkAssignments("tt3",
+ new String[] {"attempt_test_0001_m_000003_0 on tt3",
+ "attempt_test_0002_m_000003_0 on tt3"});
+
+ // Now if I ask for map tasks again, no tasks since it reached maxcapacity
+ checkAssignments("tt4", new String[] {});
+
+ // complete 3 tasks from j2
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000002_0", j2);
+ taskTrackerManager.finishTask("tt3", "attempt_test_0002_m_000003_0", j2);
+
+ // Now if I ask for map tasks again, 2 from j2,
+ // for reduce tasks, 1 from j2
+ checkAssignments("tt5",
+ new String[] {"attempt_test_0002_m_000004_0 on tt5",
+ "attempt_test_0002_m_000005_0 on tt5",
+ "attempt_test_0002_r_000001_0 on tt5"});
+
+ // complete 2 tasks from j1
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", j1);
+
+ // Now if I ask for tasks again, 1 map & 1 reduce from j1
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000004_0 on tt1",
+ "attempt_test_0001_r_000003_0 on tt1"});
+ }
+
/**
* Checks for multiple assignment.
*