You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by om...@apache.org on 2009/10/27 16:44:06 UTC
svn commit: r830230 [3/9] - in /hadoop/mapreduce/branches/HDFS-641: ./
.eclipse.templates/ conf/ ivy/ lib/ src/c++/ src/contrib/
src/contrib/capacity-scheduler/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-sche...
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Oct 27 15:43:58 2009
@@ -84,101 +84,54 @@
}
/**
- * Test the max map limit.
- *
+ * Test max capacity
* @throws IOException
*/
- public void testMaxMapCap() throws IOException {
+ public void testMaxCapacity() throws IOException {
this.setUp(4, 1, 1);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+ queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
-
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(-1);
-
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(50.0f);
//submit the Job
- FakeJobInProgress fjob1 =
- taskTrackerManager.submitJob(JobStatus.PREP, 3, 1, "default", "user");
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+ JobStatus.PREP, 4, 4, "default", "user");
taskTrackerManager.initJob(fjob1);
+ HashMap<String, String> expectedStrings = new HashMap<String, String>();
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
- //Once the 2 tasks are running the third assigment should be reduce.
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_r_000001_0 on tt3");
- //This should fail.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
- //Now complete the task 1.
- // complete the job
- taskTrackerManager.finishTask(
- task1.get(0).getTaskID().toString(),
- fjob1);
- //We have completed the tt1 task which was a map task so we expect one map
- //task to be picked up
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000003_0 on tt4");
- }
-
- /**
- * Test max reduce limit
- *
- * @throws IOException
- */
- public void testMaxReduceCap() throws IOException {
- this.setUp(4, 1, 1);
- taskTrackerManager.addQueues(new String[]{"default"});
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> task1 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
- taskTrackerManager.setFakeQueues(queues);
- scheduler.start();
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(-1);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
-
-
- //submit the Job
- FakeJobInProgress fjob1 =
- taskTrackerManager.submitJob(JobStatus.PREP, 1, 3, "default", "user");
-
- taskTrackerManager.initJob(fjob1);
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+ List<Task> task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2", expectedStrings);
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+ //we have already reached the limit
+ //this call would return null
List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+ assertNull(task3);
- //This should fail. 1 map, 2 reduces , we have reached the limit.
- List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
//Now complete the task 1 i.e map task.
- // complete the job
- taskTrackerManager.finishTask(
- task1.get(0).getTaskID().toString(),
- fjob1);
-
- //This should still fail as only map task is done
- task4 = scheduler.assignTasks(tracker("tt4"));
- assertNull(task4);
-
- //Complete the reduce task
- taskTrackerManager.finishTask(
- task2.get(0).getTaskID().toString(), fjob1);
-
- //One reduce is done hence assign the new reduce.
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_r_000003_0 on tt4");
+ for (Task task : task1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), fjob1);
+ }
+
+ expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt1");
+ task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
}
// test job run-state change
@@ -403,19 +356,27 @@
// submit a job with no queue specified. It should be accepted
// and given to the default queue.
- JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+ 10, 10, null, "u1");
+ // when we ask for tasks, we should get them for the job submitted
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ expectedTaskStrings.put(CapacityTestUtils.MAP,
+ "attempt_test_0001_m_000001_0 on tt1");
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
- // when we ask for a task, we should get one, from the job submitted
- Task t;
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
// submit another job, to a different queue
j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // now when we get a task, it should be from the second job
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
+ // now when we get tasks, it should be from the second job
+ expectedTaskStrings.clear();
+ expectedTaskStrings.put(CapacityTestUtils.MAP,
+ "attempt_test_0002_m_000001_0 on tt2");
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE,
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
}
public void testGetJobs() throws Exception {
@@ -472,6 +433,28 @@
assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
}
+ public void testCapacityAllocFailureWithLowerMaxCapacity() throws Exception {
+ String[] qs = {"default", "qAZ1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+ FakeQueueInfo qi = new FakeQueueInfo("qAZ1", -1.0f, true, 25);
+ qi.maxCapacity = 40.0f;
+ queues.add(qi);
+ taskTrackerManager.setFakeQueues(queues);
+ try {
+ scheduler.start();
+ fail("scheduler start should fail ");
+ }catch(IOException ise) {
+ Throwable e = ise.getCause();
+ assertTrue(e instanceof IllegalStateException);
+ assertEquals(
+ e.getMessage(),
+ " Capacity share (" + 75.0f + ")for unconfigured queue " + "qAZ1" +
+ " is greater than its maximum-capacity percentage " + 40.0f);
+ }
+ }
+
// Tests how capacity is computed and assignment of tasks done
// on the basis of the capacity.
public void testCapacityBasedAllocation() throws Exception {
@@ -544,19 +527,30 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
+ // we should get a task
+ Map<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- // I should get another map task.
+ expectedStrings);
+
+ // I should get another map task.
+ //No redduces as there is 1 slot only for reduce on TT
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
+
// Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment(
+ // I should get a map task from the default queue's capacity.
+ //same with reduce
+ expectedStrings.put(MAP,"attempt_test_0001_m_000003_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
+ expectedStrings);
+
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
@@ -564,21 +558,16 @@
}
/**
- * Creates a queue with max task limit of 2
- * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
- * given to high ram job and are reserved , no other tasks are accepted .
- *
+ * test the high memory blocking with max capacity.
* @throws IOException
*/
- public void testHighMemoryBlockingWithMaxLimit()
+ public void testHighMemoryBlockingWithMaxCapacity()
throws IOException {
-
- // 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("defaultXYZM", 100.0f, true, 25));
+ queues.add(new FakeQueueInfo("defaultXYZM", 25.0f, true, 50));
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -586,87 +575,101 @@
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
- scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 1 * 1024);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
- .getMapTSC().setMaxTaskLimit(2);
+ .setMaxCapacityPercent(50);
-
- // The situation : Submit 2 jobs with high memory map task
- //Set the max limit for queue to 2 ,
- // try submitting more map tasks to the queue , it should not happen
-
- LOG.debug(
- "Submit one high memory(2GB maps, 0MB reduces) job of "
- + "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(0);
+ jConf.setNumReduceTasks(1);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- LOG.debug(
- "Submit another regular memory(1GB vmem maps/reduces) job of "
- + "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(2);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(2);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // first, a map from j1 will run this is a high memory job so it would
- // occupy the 2 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ //high ram map from job 1 and normal reduce task from job 1
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
- checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
-
- checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 1, 100.0f,0,2);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- //at this point , the scheduler tries to schedule another map from j2 for
- //another task tracker.
- // This should not happen as all the map slots are taken
- //by the first task itself.hence reduce task from the second job is given
+ //we have reached the maximum limit for map, so no more map tasks.
+ //we have used 1 reduce already and 1 more reduce slot is left for the
+ //before we reach maxcapacity for reduces.
+ // But current 1 slot + 2 slots for high ram reduce would
+ //mean we are crossing the maxium capacity.hence nothing would be assigned
+ //in this call
+ assertNull(scheduler.assignTasks(tracker("tt2")));
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000002_0 on tt2");
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job1);
+ }
+
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job2);
+ }
+
+
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
}
/**
* test if user limits automatically adjust to max map or reduce limit
*/
- public void testUserLimitsWithMaxLimits() throws Exception {
- setUp(4, 4, 4);
+ public void testUserLimitsWithMaxCapacity() throws Exception {
+ setUp(2, 2, 2);
// set up some queues
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
-
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(75);
// submit a job
FakeJobInProgress fjob1 =
@@ -674,50 +677,44 @@
FakeJobInProgress fjob2 =
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
- // for queue 'default', the capacity for maps is 2.
- // But the max map limit is 2
- // hence user should be getting not more than 1 as it is the 50%.
- Task t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'default', maxCapacity for map and reduce is 3.
+ // initial user limit for 50% assuming there are 2 users/queue is.
+ // 1 map and 1 reduce.
+ // after max capacity it is 1.5 each.
- //Now we should get the task from the other job. As the
- //first user has reached his max map limit.
+ //first job would be given 1 job each.
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- //Now we are done with map limit , now if we ask for task we should
- // get reduce from 1st job
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_r_000001_0 on tt3");
- // Now we're at full capacity for maps. 1 done with reduces for job 1 so
- // now we should get 1 reduces for job 2
- Task t4 = checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_r_000001_0 on tt4");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(),
- fjob1);
+ //for user u1 we have reached the limit. that is 1 job.
+ //1 more map and reduce tasks.
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
- //tt1 completed the task so we have 1 map slot for u1
- // we are assigning the 2nd map task from fjob1
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- taskTrackerManager.finishTask(
- t4.getTaskID().toString(),
- fjob2);
- //tt4 completed the task , so we have 1 reduce slot for u2
- //we are assigning the 2nd reduce from fjob2
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_r_000002_0 on tt4");
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+ }
+ // Utility method to construct a map of expected strings
+ // with exactly one map task and one reduce task.
+ private void populateExpectedStrings(Map<String, String> expectedTaskStrings,
+ String mapTask, String reduceTask) {
+ expectedTaskStrings.clear();
+ expectedTaskStrings.put(CapacityTestUtils.MAP, mapTask);
+ expectedTaskStrings.put(CapacityTestUtils.REDUCE, reduceTask);
}
@@ -736,26 +733,31 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity is 2 for maps and 1 for reduce.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- // Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ // Now if I ask for a task, it should come from the second job
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt1", "attempt_test_0002_m_000001_0 on tt1");
+
+ // Now we're at full capacity. If I ask for another task,
+ // I should get tasks from the default queue's capacity.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignment(taskTrackerManager, scheduler,
+ "tt2", "attempt_test_0002_m_000002_0 on tt2");
}
// test user limits when a 2nd job is submitted much after first job
@@ -773,21 +775,28 @@
// submit a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity for maps is 2 and reduce is 1.
+ // Since we're the only user, we should get tasks
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
+
// Submit another job, from a different user
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
// and another
checkAssignment(
taskTrackerManager, scheduler, "tt2",
@@ -810,41 +819,56 @@
// submit a job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ // for queue 'q2', the capacity for maps is 2 and reduces is 1.
+ // Since we're the only user, we should get a task
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
// since we're the only job, we get another map
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0001_m_000002_0 on tt1");
- // we get two more maps from 'default queue'
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
+ // we get more tasks from 'default queue'
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
checkAssignment(
taskTrackerManager, scheduler, "tt2",
"attempt_test_0001_m_000004_0 on tt2");
+
// Submit another job, from a different user
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // one of the task finishes
+ // one of the task finishes of each type
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", j1);
- // Now if I ask for a map task, it should come from the second job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
+ taskTrackerManager.finishTask("attempt_test_0001_r_000001_0", j1);
+
+ // Now if I ask for a task, it should come from the second job
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt1", expectedTaskStrings);
+
// another task from job1 finishes, another new task to job2
taskTrackerManager.finishTask("attempt_test_0001_m_000002_0", j1);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
"attempt_test_0002_m_000002_0 on tt1");
+
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
taskTrackerManager.finishTask("attempt_test_0001_m_000003_0", j1);
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000005_0 on tt2");
+ taskTrackerManager.finishTask("attempt_test_0001_r_000002_0", j1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_000005_0 on tt2",
+ "attempt_test_0001_r_000003_0 on tt2");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt2", expectedTaskStrings);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", j2);
checkAssignment(
taskTrackerManager, scheduler, "tt1",
@@ -853,7 +877,7 @@
// test user limits with many users, more slots
public void testUserLimits4() throws Exception {
- // set up one queue, with 10 slots
+ // set up one queue, with 10 map slots and 5 reduce slots
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -870,42 +894,31 @@
// u1 submits job
FakeJobInProgress j1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// it gets the first 5 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000004_0 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0001_m_000005_0 on tt3");
+ Map<String, String> expectedTaskStrings = new HashMap<String, String>();
+ for (int i=0; i<5; i++) {
+ String ttName = "tt"+(i+1);
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0001_m_00000"+(i+1)+"_0 on " + ttName,
+ "attempt_test_0001_r_00000"+(i+1)+"_0 on " + ttName);
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ ttName, expectedTaskStrings);
+ }
+
// u2 submits job with 4 slots
FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt3",
- "attempt_test_0002_m_000001_0 on tt3");
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_m_000002_0 on tt4");
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0002_m_000003_0 on tt4");
- checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0002_m_000004_0 on tt5");
+ for (int i=0; i<4; i++) {
+ String ttName = "tt"+(i+1);
+ checkAssignment(taskTrackerManager, scheduler, ttName,
+ "attempt_test_0002_m_00000"+(i+1)+"_0 on " + ttName);
+ }
// last slot should go to u1, since u2 has no more tasks
checkAssignment(
taskTrackerManager, scheduler, "tt5",
"attempt_test_0001_m_000006_0 on tt5");
- // u1 finishes a task
+ // u1 finishes tasks
taskTrackerManager.finishTask("attempt_test_0001_m_000006_0", j1);
+ taskTrackerManager.finishTask("attempt_test_0001_r_000005_0", j1);
// u1 submits a few more jobs
// All the jobs are inited when submitted
// because of addition of Eager Job Initializer all jobs in this
@@ -917,23 +930,26 @@
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, null, "u2");
// now u3 submits a job
taskTrackerManager.submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
- // next slot should go to u3, even though u2 has an earlier job, since
+ // next map slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
- checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0007_m_000001_0 on tt5");
+ // reduce slot will go to job 2, as it is still under limit.
+ populateExpectedStrings(expectedTaskStrings,
+ "attempt_test_0007_m_000001_0 on tt5",
+ "attempt_test_0002_r_000001_0 on tt5");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt5", expectedTaskStrings);
// some other task finishes and u3 gets it
taskTrackerManager.finishTask("attempt_test_0002_m_000004_0", j1);
checkAssignment(
- taskTrackerManager, scheduler, "tt5",
- "attempt_test_0007_m_000002_0 on tt5");
+ taskTrackerManager, scheduler, "tt4",
+ "attempt_test_0007_m_000002_0 on tt4");
// now, u2 finishes a task
taskTrackerManager.finishTask("attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
// first in the queue
checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000007_0 on tt4");
+ taskTrackerManager, scheduler, "tt2",
+ "attempt_test_0001_m_000007_0 on tt2");
}
/**
@@ -955,13 +971,13 @@
// enabled memory-based scheduling
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(
- JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+ JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- MRConfig.MAPMEMORY_MB, 1 * 1024);
+ MRConfig.MAPMEMORY_MB, 1 * 1024);
scheduler.getConf().setLong(
- JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+ JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(
- MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
@@ -973,7 +989,8 @@
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("default");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
LOG.debug(
"Submit one high memory(2GB maps, 2GB reduces) job of "
@@ -985,60 +1002,41 @@
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u2");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // Verify that normal job takes 3 task assignments to hit user limits
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000003_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000004_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000004_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000005_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000005_0 on tt1");
+ // Verify that normal job takes 5 task assignments to hit user limits
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ for (int i = 0; i < 5; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0001_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0001_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ }
// u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
// are hit. So u2 should get slots
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
-
- // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
+ for (int i = 0; i < 2; i++) {
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP,
+ "attempt_test_0002_m_00000" + (i + 1) + "_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE,
+ "attempt_test_0002_r_00000" + (i + 1) + "_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ } // u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
// overflow limits. So, no more tasks should be given to anyone.
assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
}
/*
@@ -1051,9 +1049,7 @@
* - Then run initializationPoller()
* - Check once again the waiting queue, it should be 5 jobs again.
* - Then raise status change events.
- * - Assign one task to a task tracker. (Map)
- * - Check waiting job count, it should be 4 now and used map (%) = 100
- * - Assign another one task (Reduce)
+ * - Assign tasks to a task tracker.
* - Check waiting job count, it should be 4 now and used map (%) = 100
* and used reduce (%) = 100
* - finish the job and then check the used percentage it should go
@@ -1066,9 +1062,9 @@
* - Check the count, the waiting job count should be 2.
* - Now raise status change events to move the initialized jobs which
* should be two in count to running queue.
- * - Then schedule a map of the job in running queue.
+ * - Then schedule a map and reduce of the job in running queue.
* - Run the poller because the poller is responsible for waiting
- * jobs count. Check the count, it should be using 100% map and one
+ * jobs count. Check the count, it should be using 100% map, reduce and one
* waiting job
* - fail the running job.
* - Check the count, it should be now one waiting job and zero running
@@ -1161,9 +1157,12 @@
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//assign one job
- Task t1 = checkAssignment(
+ Map<String, String> strs = new HashMap<String, String>();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
@@ -1174,24 +1173,22 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
+
+ assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
- assertEquals(infoStrings[15], "Running tasks: 0");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
- //assign a reduce task
- Task t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
// make sure we update our stats
scheduler.updateContextInfoForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
+
assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
@@ -1205,8 +1202,9 @@
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
- taskTrackerManager.finishTask(t1.getTaskID().toString(), u1j1);
- taskTrackerManager.finishTask(t2.getTaskID().toString(), u1j1);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(task.getTaskID().toString(), u1j1);
+ }
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
@@ -1214,6 +1212,7 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
+
assertEquals(infoStrings.length, 18);
assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 0");
@@ -1274,9 +1273,12 @@
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
- t1 = checkAssignment(
+ strs.clear();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
+ strs);
FakeJobInProgress u1j3 = userJobs.get(2);
assertTrue(
"User Job 3 not running ",
@@ -1290,12 +1292,16 @@
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings.length, 22);
assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], "Running tasks: 1");
assertEquals(infoStrings[9], "Active users:");
assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
+ assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[16], "Active users:");
+ assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[20], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -1347,13 +1353,14 @@
// assert that all tasks are launched even though they transgress the
// scheduling limits.
-
- checkAssignment(
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ expectedStrings);
}
/**
@@ -1366,7 +1373,7 @@
throws IOException {
// 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+ taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1399,7 +1406,8 @@
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
LOG.debug(
"Submit another regular memory(1GB vmem maps/reduces) job of "
@@ -1411,170 +1419,188 @@
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // first, a map from j1 will run
- checkAssignment(
+ // first, a map from j1 and a reduce from other job j2
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
// Total 2 map slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
+ //TT has 2 slots for reduces hence this call should get a reduce task
+ //from other job
checkAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+ "attempt_test_0002_r_000002_0 on tt1");
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
+
+ //now as all the slots are occupied hence no more tasks would be
+ //assigned.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
}
- /**
- * Test blocking of cluster for lack of memory.
+ /**
+ * Tests that scheduler schedules normal jobs once high RAM jobs
+ * have been reserved to the limit.
+ *
+ * The test causes the scheduler to schedule a normal job on two
+ * trackers, and one task of the high RAM job on a third. Then it
+ * asserts that one of the first two trackers gets a reservation
+ * for the remaining task of the high RAM job. After this, it
+ * asserts that a normal job submitted later is allowed to run
+ * on a free slot, as all tasks of the high RAM job are either
+ * scheduled or reserved.
*
* @throws IOException
*/
public void testClusterBlockingForLackOfMemory()
- throws IOException {
-
- LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
-
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
- taskTrackerManager.addQueues(new String[]{"default"});
-
-
- scheduler.setTaskTrackerManager(taskTrackerManager);
- // enabled memory-based scheduling
- // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
- scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
- scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
- scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
- scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
- taskTrackerManager.setFakeQueues(queues);
- scheduler.start();
-
- LOG.debug(
- "Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 1 reduce tasks.");
- JobConf jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
- // Fill the second tt with this job.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000001_0 on tt2");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 25.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 1, 0, 0, 0, 0),
- (String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 0L);
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000001_0 on tt2");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 25.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 1, 0, 1, 1, 0),
- (String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
-
- LOG.debug(
- "Submit one high memory(2GB maps/reduces) job of "
- + "2 map, 2 reduce tasks.");
- jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(2 * 1024);
- jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(2);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ throws IOException {
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- // Total 3 map slots should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 3, 75.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 0, 0, 0, 0),
- (String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+ LOG.debug("Starting the scheduler.");
+ taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
- // Total 3 reduce slots should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 3,
- 75.0f);
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 0, 1, 2, 0),
- (String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
-
- LOG.debug(
- "Submit one normal memory(1GB maps/reduces) job of "
- + "1 map, 0 reduce tasks.");
- jConf = new JobConf(conf);
- jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(1);
- jConf.setNumReduceTasks(1);
- jConf.setQueueName("default");
- jConf.setUser("u1");
- FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
-
- // Job2 cannot fit on tt1. So tt1 is reserved for a map slot of job2
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
-
- // reserved tasktrackers contribute to occupied slots for maps.
- checkOccupiedSlots("default", TaskType.MAP, 1, 5, 125.0f);
- // occupied slots for reduces remain unchanged as tt1 is not reserved for
- // reduces.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 3, 75.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 2 * 1024L);
- checkMemReservedForTasksOnTT("tt2", 1 * 1024L, 1 * 1024L);
- LOG.info(job2.getSchedulingInfo());
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 1, 2, 2, 1, 2, 0),
- (String) job2.getSchedulingInfo());
- assertEquals(
- String.format(
- TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
- 0, 0, 0, 0, 0, 0),
- (String) job3.getSchedulingInfo());
-
- // One reservation is already done for job2. So job3 should go ahead.
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0003_m_000001_0 on tt2");
- }
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ taskTrackerManager.addQueues(new String[]{"default"});
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ // Normal jobs 1GB maps/reduces. 2GB limit on maps/reduces
+ scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
+ scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
+ taskTrackerManager.setFakeQueues(queues);
+ scheduler.start();
+
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Fill a tt with this job's tasks.
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+ // Total 1 map slot should be accounted for.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 1, 0, 1, 1, 0),
+ (String) job1.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+ // fill another TT with the rest of the tasks of the job
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ LOG.debug(
+ "Submit one high memory(2GB maps/reduces) job of "
+ + "2 map, 2 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Have another TT run one task of each type of the high RAM
+ // job. This will fill up the TT.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt3");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt3");
+
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler,
+ "tt3", expectedStrings);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 0, 1, 2, 0),
+ (String) job2.getSchedulingInfo());
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
+
+ LOG.debug(
+ "Submit one normal memory(1GB maps/reduces) job of "
+ + "1 map, 1 reduce tasks.");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job3 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
+
+ // Send a TT with insufficient space for task assignment,
+ // This will cause a reservation for the high RAM job.
+ assertNull(scheduler.assignTasks(tracker("tt1")));
+
+ // reserved tasktrackers contribute to occupied slots for maps and reduces
+ checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
+ checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 1, 2, 2, 1, 2, 2),
+ (String) job2.getSchedulingInfo());
+ assertEquals(
+ String.format(
+ TaskSchedulingContext.JOB_SCHEDULING_INFO_FORMAT_STRING,
+ 0, 0, 0, 0, 0, 0),
+ (String) job3.getSchedulingInfo());
+
+ // Reservations are already done for job2. So job3 should go ahead.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt2");
+
+ checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+ }
/**
* Testcase to verify fix for a NPE (HADOOP-5641), when memory based
@@ -1613,24 +1639,21 @@
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // 1st cycle - 1 map gets assigned.
- Task t = checkAssignment(
+ // 1st cycle - 1 map and reduce gets assigned.
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- // Total 1 map slot should be accounted for.
+ expectedStrings);
+ // Total 1 map slot and 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
- // 1st cycle of reduces - 1 reduce gets assigned.
- Task t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
- checkOccupiedSlots(
- "default", TaskType.REDUCE, 1, 1,
- 50.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
checkMemReservedForTasksOnTT("tt1", 512L, 512L);
// kill this job !
@@ -1653,20 +1676,21 @@
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
// since with HADOOP-5964, we don't rely on a job conf to get
// the memory occupied, scheduling should be able to work correctly.
- t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
- // assign a reduce now.
- t1 = checkAssignment(
+ List<Task> t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
+ expectedStrings);
+ checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
@@ -1674,22 +1698,22 @@
assertNull(scheduler.assignTasks(tracker("tt1")));
// finish the tasks on the tracker.
- taskTrackerManager.finishTask(t.getTaskID().toString(), job1);
- taskTrackerManager.finishTask(t1.getTaskID().toString(), job1);
+ for (Task task : t) {
+ taskTrackerManager.finishTask(task.getTaskID().toString(), job1);
+ }
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
// now a new task can be assigned.
- t = checkAssignment(
+ t = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings);
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- // memory used will change because of the finished task above.
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
- // reduce can be assigned.
- t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
+ // memory used will change because of the finished task above.
checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
}
@@ -1721,9 +1745,10 @@
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
// submit 4 jobs each for 3 users.
- HashMap<String, ArrayList<FakeJobInProgress>> userJobs = taskTrackerManager.submitJobs(
- 3,
- 4, "default");
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+ taskTrackerManager.submitJobs(
+ 3,
+ 4, "default");
// get the jobs submitted.
ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
@@ -1782,31 +1807,37 @@
raiseStatusChangeEvents(mgr);
// get some tasks assigned.
- Task t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- Task t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- Task t3 = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000001_0 on tt2");
- Task t4 = checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000001_0 on tt2");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 0));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 0));
- taskTrackerManager.finishTask(
- t3.getTaskID().toString(), u1Jobs.get(
- 1));
- taskTrackerManager.finishTask(
- t4.getTaskID().toString(), u1Jobs.get(
- 1));
+ Map<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+ List<Task> t1 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1",
+ expectedStrings);
+
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt2");
+
+ List<Task> t2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2",
+ expectedStrings);
+
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 0));
+ }
+ for (Task task : t2) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 0));
+ }
// as some jobs have running tasks, the poller will now
// pick up new jobs to initialize.
controlledInitializationPoller.selectJobsToInitialize();
@@ -1827,19 +1858,21 @@
"Initialized jobs contains the user1 job 2",
initializedJobs.contains(u1Jobs.get(1).getJobID()));
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0003_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0003_r_000001_0 on tt1");
+
// finish one more job
- t1 = checkAssignment(
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_m_000001_0 on tt1");
- t2 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0003_r_000001_0 on tt1");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 2));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 2));
+ expectedStrings);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 2));
+ }
// no new jobs should be picked up, because max user limit
// is still 3.
@@ -1847,19 +1880,21 @@
assertEquals(initializedJobs.size(), 5);
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0004_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0004_r_000001_0 on tt1");
+
// run 1 more jobs..
- t1 = checkAssignment(
+ t1 = checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_m_000001_0 on tt1");
- t1 = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0004_r_000001_0 on tt1");
- taskTrackerManager.finishTask(
- t1.getTaskID().toString(), u1Jobs.get(
- 3));
- taskTrackerManager.finishTask(
- t2.getTaskID().toString(), u1Jobs.get(
- 3));
+ expectedStrings);
+ for (Task task : t1) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(), u1Jobs.get(
+ 3));
+ }
// Now initialised jobs should contain user 4's job, as
// user 1's jobs are all done and the number of users is
@@ -1968,12 +2003,13 @@
taskTrackerManager.submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
- Task t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment(
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(CapacityTestUtils.MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ strs);
+
}
public void testFailedJobInitalizations() throws Exception {
@@ -1992,7 +2028,7 @@
FakeJobInProgress job =
new FakeFailingJobInProgress(
new JobID("test", ++jobCounter),
- new JobConf(), taskTrackerManager, "u1");
+ new JobConf(), taskTrackerManager, "u1", UtilsForTests.getJobTracker());
job.getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(job);
//check if job is present in waiting list.
@@ -2049,37 +2085,41 @@
conf.setReduceSpeculativeExecution(true);
//Submit a job which would have one speculative map and one speculative
//reduce.
- FakeJobInProgress fjob1 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+ JobStatus.PREP, conf);
conf = new JobConf();
conf.setNumMapTasks(1);
conf.setNumReduceTasks(1);
//Submit a job which has no speculative map or reduce.
- FakeJobInProgress fjob2 = taskTrackerManager.submitJob(JobStatus.PREP, conf);
+ FakeJobInProgress fjob2 = taskTrackerManager.submitJob(
+ JobStatus.PREP, conf);
//Ask the poller to initalize all the submitted job and raise status
//change event.
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(mgr);
-
- checkAssignment(
+ Map<String, String> strs = new HashMap<String, String>();
+ strs.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ strs.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ strs);
assertTrue(
"Pending maps of job1 greater than zero",
(fjob1.pendingMaps() == 0));
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000001_1 on tt2");
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+
assertTrue(
- "Pending reduces of job2 greater than zero",
+ "Pending reduces of job1 greater than zero",
(fjob1.pendingReduces() == 0));
- checkAssignment(
+
+ Map<String, String> str = new HashMap<String, String>();
+ str.put(CapacityTestUtils.MAP, "attempt_test_0001_m_000001_1 on tt2");
+ str.put(CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_1 on tt2");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000001_1 on tt2");
+ str);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_0", fjob1);
taskTrackerManager.finishTask("attempt_test_0001_m_000001_1", fjob1);
@@ -2087,12 +2127,13 @@
taskTrackerManager.finishTask("attempt_test_0001_r_000001_1", fjob1);
taskTrackerManager.finalizeJob(fjob1);
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment(
+ str.clear();
+ str.put(CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ str.put(CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
+ str);
taskTrackerManager.finishTask("attempt_test_0002_m_000001_0", fjob2);
taskTrackerManager.finishTask("attempt_test_0002_r_000001_0", fjob2);
taskTrackerManager.finalizeJob(fjob2);
@@ -2267,7 +2308,8 @@
jConf.setNumReduceTasks(6);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
// Submit a normal job to the other queue.
jConf = new JobConf(conf);
@@ -2277,108 +2319,178 @@
jConf.setNumReduceTasks(6);
jConf.setUser("u1");
jConf.setQueueName("q1");
- FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, jConf);
- // Map 1 of high memory job
- checkAssignment(
+ // Map and reduce of high memory job should be assigned
+ HashMap<String, String> expectedStrings = new HashMap<String, String>();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings);
+
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 1 of high memory job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 1 of normal job
- checkAssignment(
+ // 1st map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings);
+
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
-
- // Reduce 1 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000001_0 on tt1");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 2 of normal job
- checkAssignment(
+ // 2nd map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000002_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_m_000002_0 on tt1");
+ expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
-
- // Reduce 2 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0002_r_000002_0 on tt1");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
// Now both the queues are equally served. But the comparator doesn't change
// the order if queues are equally served.
+ // Hence, 3rd map and reduce of normal job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000003_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000003_0 on tt2");
- // Map 3 of normal job
- checkAssignment(
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000003_0 on tt2");
+ expectedStrings);
+
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 3 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000003_0 on tt2");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 2 of high memory job
- checkAssignment(
+ // 2nd map and reduce of high memory job should be assigned
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings);
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 2 of high memory job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0001_r_000002_0 on tt2");
checkQueuesOrder(
qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- // Map 4 of normal job
- checkAssignment(
+ // 4th map and reduce of normal job should be assigned.
+ expectedStrings.clear();
+ expectedStrings.put(
+ CapacityTestUtils.MAP, "attempt_test_0002_m_000004_0 on tt2");
+ expectedStrings.put(
+ CapacityTestUtils.REDUCE, "attempt_test_0002_r_000004_0 on tt2");
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_m_000004_0 on tt2");
+ expectedStrings);
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
- // Reduce 4 of normal job
- checkAssignment(
- taskTrackerManager, scheduler, "tt2",
- "attempt_test_0002_r_000004_0 on tt2");
checkQueuesOrder(
reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
}
+ /**
+ * Tests whether 1 map and 1 reduce are assigned even if reduces span across
+ * multiple jobs or multiple queues.
+ *
+ * creates a cluster of 6 maps and 2 reduces.
+ * Submits 2 jobs:
+ * job1 , with 6 map and 1 reduces
+ * job2 with 2 map and 1 reduces
+ *
+ *
+ * check that first assignment assigns a map and a reduce.
+ * check that second assignment assigns a map and a reduce
+ * (both from other job and other queue)
+ *
+ * the last 2 calls just checks to make sure that we dont get further reduces
+ *
+ * @throws Exception
+ */
+ public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
+ setUp(1, 6, 2);
+ // set up some queues
+ String[] qs = {"default", "q1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+ taskTrackerManager.setFakeQueues(queues);
+ scheduler.start();
+
+ //Submit the job with 6 maps and 2 reduces
+ taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 6, 1, "default", "u1");
+
+ FakeJobInProgress j2 = taskTrackerManager.submitJobAndInit(
+ JobStatus.PREP, 2, 1, "q1", "u2");
+
+ Map<String, String> str = new HashMap<String, String>();
+ str.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ str.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ // next assignment will be for job in second queue.
+ str.clear();
+ str.put(MAP, "attempt_test_0002_m_000001_0 on tt1");
+ str.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ //now both the reduce slots are being used , hence we sholdnot get only 1
+ //map task in this assignTasks call.
+ str.clear();
+ str.put(MAP, "attempt_test_0002_m_000002_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+
+ str.clear();
+ str.put(MAP, "attempt_test_0001_m_000002_0 on tt1");
+ checkMultipleTaskAssignment(taskTrackerManager, scheduler, "tt1", str);
+ }
+
+
private void checkRunningJobMovementAndCompletion() throws IOException {
JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2402,12 +2514,13 @@
mgr.getJobQueue("default").getRunningJobs().contains(job));
// assign a task
- Task t = checkAssignment(
- taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment(
+ Map<String,String> strs = new HashMap<String,String>();
+ strs.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ strs.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ checkMultipleTaskAssignment(
taskTrackerManager, scheduler, "tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ strs);
controlledInitializationPoller.selectJobsToInitialize();
@@ -2551,14 +2664,14 @@
tracker(taskTracker).getStatus(),
TaskType.REDUCE);
if (expectedMemForMapsOnTT == null) {
- assertTrue(observedMemForMapsOnTT == null);
+ assertEquals(observedMemForMapsOnTT,null);
} else {
- assertTrue(observedMemForMapsOnTT.equals(expectedMemForMapsOnTT));
+ assertEquals(observedMemForMapsOnTT,expectedMemForMapsOnTT);
}
if (expectedMemForReducesOnTT == null) {
- assertTrue(observedMemForReducesOnTT == null);
+ assertEquals(observedMemForReducesOnTT,null);
} else {
- assertTrue(observedMemForReducesOnTT.equals(expectedMemForReducesOnTT));
+ assertEquals(observedMemForReducesOnTT,expectedMemForReducesOnTT);
}
}
@@ -2578,16 +2691,12 @@
* @param incrReduceIndex
*/
private void checkOccupiedSlots(
- String queue,
- TaskType type, int numActiveUsers,
- int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,
- int incrMapIndex
- , int incrReduceIndex
- ) {
+ String queue, TaskType type, int numActiveUsers, int expectedOccupiedSlots,
+ float expectedOccupiedSlotsPercent, int incrMapIndex, int incrReduceIndex) {
scheduler.updateContextInfoForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
- String schedulingInfo =
- queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+ String schedulingInfo = queueManager.getJobQueueInfo(queue)
+ .getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
int index = -1;
if (type.equals(TaskType.MAP)) {
@@ -2599,9 +2708,8 @@
LOG.info(infoStrings[index]);
assertEquals(
String.format(
- "Used capacity: %d (%.1f%% of Capacity)",
- expectedOccupiedSlots, expectedOccupiedSlotsPercent),
- infoStrings[index]);
+ "Used capacity: %d (%.1f%% of Capacity)", expectedOccupiedSlots,
+ expectedOccupiedSlotsPercent), infoStrings[index]);
}
/**
Modified: hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=830230&r1=830229&r2=830230&view=diff
==============================================================================
--- hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/branches/HDFS-641/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Tue Oct 27 15:43:58 2009
@@ -149,9 +149,6 @@
prp.setProperty("maximum-capacity","20.5");
prp.setProperty("supports-priority","false");
prp.setProperty("minimum-user-limit-percent","23");
- prp.setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,"43");
- prp.setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,"43");
-
CapacitySchedulerConf conf = new CapacitySchedulerConf();
conf.setProperties("default",prp);
@@ -160,10 +157,6 @@
assertTrue(conf.getMaxCapacity("default") == 20.5f);
assertTrue(conf.isPrioritySupported("default") == false);
assertTrue(conf.getMinimumUserLimitPercent("default")==23);
- assertTrue(conf.getMaxMapCap("default") == 43);
- assertTrue(conf.getMaxReduceCap("default") == 43);
-
-
//check for inproper stuff
prp.setProperty("capacity","h");