You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:26:09 UTC
svn commit: r1077538 [2/2] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapred/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 04:26:09 2011
@@ -192,6 +192,12 @@ public class TestCapacityScheduler exten
}
@Override
+ public Task obtainNewLocalMapTask(final TaskTrackerStatus tts, int clusterSize,
+ int ignored) throws IOException {
+ return obtainNewMapTask(tts, clusterSize, ignored);
+ }
+
+ @Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
@@ -751,7 +757,6 @@ public class TestCapacityScheduler exten
numReduceTasksPerTracker);
clock = new FakeClock();
scheduler = new CapacityTaskScheduler(clock);
- scheduler.setAssignMultipleTasks(false);
scheduler.setTaskTrackerManager(taskTrackerManager);
conf = new JobConf();
@@ -889,7 +894,6 @@ public class TestCapacityScheduler exten
resConf.setFakeQueues(queues);
resConf.setMaxCapacity("default", 50.0f);
scheduler.setResourceManagerConf(resConf);
- scheduler.setAssignMultipleTasks(true);
scheduler.start();
//submit the Job
@@ -994,31 +998,32 @@ public class TestCapacityScheduler exten
// I. Check multiple assignments with running tasks within job
// ask for a task from first job
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // ask for another task from the first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
-
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1"}
+ );
+
// complete tasks
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
// II. Check multiple assignments with running tasks across jobs
// ask for a task from first job
- t = checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
-
- // ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
-
- // complete tasks
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000003_0 on tt1",
+ "attempt_test_0002_m_000001_0 on tt1"}
+ );
+
+ // complete task from job1
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000003_0", j1);
// III. Check multiple assignments with completed tasks across jobs
// ask for a task from the second job
- t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
- // complete task
+ // complete tasks
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000002_0", j2);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
// IV. Check assignment with completed job
// finish first job
@@ -1026,7 +1031,7 @@ public class TestCapacityScheduler exten
// ask for another task from the second job
// if tasks can be assigned then the structures are properly updated
- t = checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
// complete task
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000003_0", j2);
@@ -1038,7 +1043,6 @@ public class TestCapacityScheduler exten
* @throws Exception
*/
public void testMultiTaskAssignmentInSingleQueue() throws Exception {
- try {
setUp(1, 6, 2);
// set up some queues
String[] qs = {"default"};
@@ -1048,25 +1052,13 @@ public class TestCapacityScheduler exten
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- scheduler.setAssignMultipleTasks(true);
//Submit the job with 6 maps and 2 reduces
FakeJobInProgress j1 = submitJobAndInit(
JobStatus.PREP, 6, 2, "default", "u1");
List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 2);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
+ assertEquals(tasks.size(), 7);
for (Task task : tasks) {
if (task.toString().equals("attempt_test_0001_m_000001_0 on tt1")) {
@@ -1077,123 +1069,51 @@ public class TestCapacityScheduler exten
}
}
- tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 2);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_r_000002_0 on tt1");
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
-
- //now both the reduce slots are being used , hence we should not
- // get only 1 map task in this assignTasks call.
+ // Only 1 reduce left
tasks = scheduler.assignTasks(tracker("tt1"));
assertEquals(tasks.size(), 1);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_m_000003_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- fail("should not give reduce task " + task.toString());
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
- } finally {
- scheduler.setAssignMultipleTasks(false);
- }
}
public void testMultiTaskAssignmentInMultipleQueues() throws Exception {
- try {
- setUp(1, 6, 2);
+ setUp(1, 4, 2);
// set up some queues
- String[] qs = {"default","q1"};
+ String[] qs = {"q1","q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- scheduler.setAssignMultipleTasks(true);
+ System.err.println("testMultiTaskAssignmentInMultipleQueues");
//Submit the job with 6 maps and 2 reduces
- submitJobAndInit(
- JobStatus.PREP, 6, 1, "default", "u1");
-
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP,2,1,"q1","u2");
-
- List<Task> tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 2);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_m_000001_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_r_000001_0 on tt1");
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
-
- // next assignment will be for job in second queue.
- tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 2);
+ FakeJobInProgress j1 =
+ submitJobAndInit(JobStatus.PREP, 6, 1, "q1", "u1");
+ FakeJobInProgress j2 =
+ submitJobAndInit(JobStatus.PREP,2,1,"q2","u2");
+
+ List<Task> tasks = checkAssignments("tt1",
+ new String[] {"attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0002_m_000002_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1",
+ });
+ //Now finish the tasks
for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0002_m_000001_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0002_r_000001_0 on tt1");
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
-
- //now both the reduce slots are being used , hence we sholdnot get only 1
- //map task in this assignTasks call.
- tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 1);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- // we get from job 2 because the queues are equal in capacity usage
- // and sorting leaves order unchanged.
- assertEquals(task.toString(), "attempt_test_0002_m_000002_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- fail("should not give reduce task " + task.toString());
- } else {
- fail(" should not have come here " + task.toString());
- }
+ FakeJobInProgress j =
+ (task.getTaskID().getJobID().getId() == 1) ? j1 : j2;
+ taskTrackerManager.finishTask("tt1", task.getTaskID().toString(), j);
}
- tasks = scheduler.assignTasks(tracker("tt1"));
- assertEquals(tasks.size(), 1);
- for (Task task : tasks) {
- if (task.toString().contains("_m_")) {
- LOG.info(" map task assigned " + task.toString());
- assertEquals(task.toString(), "attempt_test_0001_m_000002_0 on tt1");
- } else if (task.toString().contains("_r_")) {
- LOG.info(" reduce task assigned " + task.toString());
- fail("should not give reduce task " + task.toString());
- } else {
- fail(" should not have come here " + task.toString());
- }
- }
- } finally {
- scheduler.setAssignMultipleTasks(false);
- }
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000003_0 on tt1",
+ "attempt_test_0001_m_000004_0 on tt1",
+ "attempt_test_0001_m_000005_0 on tt1",
+ "attempt_test_0001_m_000006_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1",
+ });
}
// basic tests, should be able to submit to queues
@@ -1213,12 +1133,17 @@ public class TestCapacityScheduler exten
JobInProgress j = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
// when we ask for a task, we should get one, from the job submitted
- Task t;
- t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
// submit another job, to a different queue
j = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// now when we get a task, it should be from the second job
- t = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2"});
}
public void testGetJobs() throws Exception {
@@ -1313,19 +1238,25 @@ public class TestCapacityScheduler exten
submitJobAndInit(JobStatus.PREP, 10, 0, "q2", "u1");
// job from q2 runs first because it has some non-zero capacity.
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_m_000002_0 on tt1"});
verifyCapacity("0", "default");
verifyCapacity("3", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt3");
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0002_m_000003_0 on tt2",
+ "attempt_test_0002_m_000004_0 on tt2"});
verifyCapacity("0", "default");
verifyCapacity("5", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt4");
- checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+ checkAssignments("tt3",
+ new String[] {"attempt_test_0002_m_000005_0 on tt3",
+ "attempt_test_0002_m_000006_0 on tt3"});
verifyCapacity("0", "default");
verifyCapacity("7", "q2");
@@ -1333,7 +1264,9 @@ public class TestCapacityScheduler exten
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
// in terms of runningMaps / capacity.
- checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
+ checkAssignments("tt4",
+ new String[] {"attempt_test_0001_m_000001_0 on tt4",
+ "attempt_test_0002_m_000007_0 on tt4"});
verifyCapacity("1", "default");
verifyCapacity("9", "q2");
}
@@ -1342,7 +1275,7 @@ public class TestCapacityScheduler exten
String queue) throws IOException {
String schedInfo = taskTrackerManager.getQueueManager().
getSchedulerInfo(queue).toString();
- assertTrue(schedInfo.contains("Map tasks\nCapacity: "
+ assertTrue(schedInfo, schedInfo.contains("Map tasks\nCapacity: "
+ expectedCapacity + " slots"));
}
@@ -1361,15 +1294,18 @@ public class TestCapacityScheduler exten
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // I should get another map task.
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // we should get 2 task
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
// Now we're at full capacity for maps. If I ask for another map task,
// I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- // and another
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_m_000004_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"});
}
/**
@@ -1382,7 +1318,10 @@ public class TestCapacityScheduler exten
public void testHighMemoryBlockingWithMaxCapacity()
throws IOException {
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
+ final int NUM_MAP_SLOTS = 2;
+ final int NUM_REDUCE_SLOTS = 2;
+ taskTrackerManager =
+ new FakeTaskTrackerManager(2, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -1405,7 +1344,6 @@ public class TestCapacityScheduler exten
JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- scheduler.setAssignMultipleTasks(true);
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
@@ -1433,7 +1371,8 @@ public class TestCapacityScheduler exten
checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 1, 100.0f,0,2);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
//we have reached the maximum limit for map, so no more map tasks.
//we have used 1 reduce already and 1 more reduce slot is left for the
@@ -1457,7 +1396,8 @@ public class TestCapacityScheduler exten
checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1, 2, 200.0f,1,0);
checkOccupiedSlots("defaultXYZ", TaskType.REDUCE, 1, 2, 200.0f,0,2);
- checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
//complete the high ram job on tt1.
for (Task task : t2) {
@@ -1485,7 +1425,6 @@ public class TestCapacityScheduler exten
resConf.setFakeQueues(queues);
resConf.setMaxCapacity("default", 75.0f);
scheduler.setResourceManagerConf(resConf);
- scheduler.setAssignMultipleTasks(true);
scheduler.start();
// submit a job
@@ -1499,22 +1438,22 @@ public class TestCapacityScheduler exten
// 1 map and 1 reduce.
// after max capacity it is 1.5 each.
- //first job would be given 1 job each.
- List<Task> t1 = this.checkMultipleAssignment(
- "tt1", "attempt_test_0001_m_000001_0 on tt1",
- "attempt_test_0001_r_000001_0 on tt1");
-
- //for user u1 we have reached the limit. that is 1 job.
- //1 more map and reduce tasks.
- List<Task> t2 = this.checkMultipleAssignment(
- "tt1", "attempt_test_0002_m_000001_0 on tt1",
- "attempt_test_0002_r_000001_0 on tt1");
-
- t1 = this.checkMultipleAssignment(
- "tt2", "attempt_test_0001_m_000002_0 on tt2",
- "attempt_test_0001_r_000002_0 on tt2");
-
- t1 = this.checkMultipleAssignment("tt2", null,null);
+ //each job would be given 1 map task each.
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
+ //abt to hit max map capacity for default
+ //hit user limit for reduces
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2"});
+
+ // only 1 reduce slot is remaining on tt2
+ // no more maps since no map slots are available
+ checkAssignments("tt2",
+ new String[] { "attempt_test_0001_r_000002_0 on tt2"});
}
@@ -1533,17 +1472,19 @@ public class TestCapacityScheduler exten
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // we should get 2 maps and 1 reduce
+ checkAssignments("tt1",
+ new String[]{"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
// Submit another job, from a different user
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // Now we're at full capacity for maps. If I ask for another map task,
- // I should get a map task from the default queue's capacity.
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- // and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ // Now if I ask for a map task, it should come from the second job from default queue's capacity
+ checkAssignments("tt2",
+ new String[]{"attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2"});
}
// test user limits when a 2nd job is submitted much after first job
@@ -1561,16 +1502,19 @@ public class TestCapacityScheduler exten
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
- // we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ // we should get 2 map tasks & 1 reduce
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
// Submit another job, from a different user
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
// Now if I ask for a map task, it should come from the second job
- checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
- // and another
- checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2"});
}
// test user limits when a 2nd job is submitted much after first job
@@ -1590,25 +1534,38 @@ public class TestCapacityScheduler exten
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // since we're the only job, we get another map
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
// we get two more maps from 'default queue'
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_m_000004_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"});
+
// Submit another job, from a different user
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
+
// one of the task finishes
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+
// Now if I ask for a map task, it should come from the second job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- // another task from job1 finishes, another new task to job2
+ // and reduce from job2
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0002_m_000001_0 on tt1"});
+
+ // another task from job1 finishes, another new map and reduce task to job2
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+
+ // job2 shud get the map slot & reduce
+ checkAssignments("tt1", new String[] {"attempt_test_0002_m_000002_0 on tt1"});
+
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
- checkAssignment("tt2", "attempt_test_0001_m_000005_0 on tt2");
+ checkAssignments("tt2", new String[] {"attempt_test_0001_m_000005_0 on tt2"});
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
}
@@ -1630,23 +1587,44 @@ public class TestCapacityScheduler exten
// u1 submits job
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- // it gets the first 5 slots
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
- checkAssignment("tt3", "attempt_test_0001_m_000005_0 on tt3");
+ // it gets the first 6 slots
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"
+ });
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0001_m_000003_0 on tt2",
+ "attempt_test_0001_m_000004_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"
+ });
+ checkAssignments("tt3",
+ new String[] {
+ "attempt_test_0001_m_000005_0 on tt3",
+ "attempt_test_0001_m_000006_0 on tt3",
+ "attempt_test_0001_r_000003_0 on tt3"
+ });
+
// u2 submits job with 4 slots
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 4, 4, null, "u2");
// u2 should get next 4 slots
- checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
- checkAssignment("tt4", "attempt_test_0002_m_000002_0 on tt4");
- checkAssignment("tt4", "attempt_test_0002_m_000003_0 on tt4");
- checkAssignment("tt5", "attempt_test_0002_m_000004_0 on tt5");
- // last slot should go to u1, since u2 has no more tasks
- checkAssignment("tt5", "attempt_test_0001_m_000006_0 on tt5");
+ checkAssignments("tt4",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt4",
+ "attempt_test_0002_m_000002_0 on tt4",
+ "attempt_test_0002_r_000001_0 on tt4"
+ });
+ checkAssignments("tt5",
+ new String[] {
+ "attempt_test_0002_m_000003_0 on tt5",
+ "attempt_test_0002_m_000004_0 on tt5",
+ "attempt_test_0002_r_000002_0 on tt5"
+ });
+
// u1 finishes a task
- taskTrackerManager.finishTask("tt5", "attempt_test_0001_m_000006_0", j1);
+ taskTrackerManager.finishTask("tt3", "attempt_test_0001_m_000006_0", j1);
// u1 submits a few more jobs
// All the jobs are inited when submitted
// because of addition of Eager Job Initializer all jobs in this
@@ -1660,10 +1638,10 @@ public class TestCapacityScheduler exten
submitJobAndInit(JobStatus.PREP, 2, 2, null, "u3");
// next slot should go to u3, even though u2 has an earlier job, since
// user limits have changed and u1/u2 are over limits
- checkAssignment("tt5", "attempt_test_0007_m_000001_0 on tt5");
+ checkAssignment("tt3", "attempt_test_0007_m_000001_0 on tt3");
// some other task finishes and u3 gets it
- taskTrackerManager.finishTask("tt5", "attempt_test_0002_m_000004_0", j1);
- checkAssignment("tt5", "attempt_test_0007_m_000002_0 on tt5");
+ taskTrackerManager.finishTask("tt3", "attempt_test_0001_m_000005_0", j1);
+ checkAssignment("tt3", "attempt_test_0007_m_000002_0 on tt3");
// now, u2 finishes a task
taskTrackerManager.finishTask("tt4", "attempt_test_0002_m_000002_0", j1);
// next slot will go to u1, since u3 has nothing to run and u1's job is
@@ -1679,6 +1657,7 @@ public class TestCapacityScheduler exten
*/
public void testUserLimitsForHighMemoryJobs()
throws IOException {
+ System.err.println("testUserLimitsForHighMemoryJobs");
taskTrackerManager = new FakeTaskTrackerManager(1, 10, 10);
scheduler.setTaskTrackerManager(taskTrackerManager);
String[] qs = { "default" };
@@ -1720,30 +1699,25 @@ public class TestCapacityScheduler exten
jConf.setUser("u2");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
- // Verify that normal job takes 3 task assignments to hit user limits
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000003_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000004_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000005_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000005_0 on tt1");
- // u1 has 5 map slots and 5 reduce slots. u2 has none. So u1's user limits
- // are hit. So u2 should get slots
-
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
+ // Verify that normal job takes 3 task assignments to hit user limits,
+ // and then j2 gets 4 slots
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_m_000003_0 on tt1",
+ "attempt_test_0001_m_000004_0 on tt1",
+ "attempt_test_0001_m_000005_0 on tt1",
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1",
+ });
// u1 has 5 map slots and 5 reduce slots. u2 has 4 map slots and 4 reduce
// slots. Because of high memory tasks, giving u2 another task would
// overflow limits. So, no more tasks should be given to anyone.
- assertNull(scheduler.assignTasks(tracker("tt1")));
- assertNull(scheduler.assignTasks(tracker("tt1")));
+// assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
+// assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
}
/*
@@ -1781,6 +1755,7 @@ public class TestCapacityScheduler exten
*/
public void testSchedulingInformation() throws Exception {
+ System.err.println("testSchedulingInformation()");
String[] qs = {"default", "q2"};
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1861,7 +1836,11 @@ public class TestCapacityScheduler exten
raiseStatusChangeEvents(scheduler.jobQueuesManager);
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q2");
//assign one job
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
@@ -1872,37 +1851,20 @@ public class TestCapacityScheduler exten
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
- assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
- assertEquals(infoStrings[8], "Running tasks: 1");
- assertEquals(infoStrings[9], "Active users:");
- assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[14], "Used capacity: 0 (0.0% of Capacity)");
- assertEquals(infoStrings[15], "Running tasks: 0");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 4");
-
- //assign a reduce task
- Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- // make sure we update our stats
- scheduler.updateQSIInfoForTests();
- schedulingInfo =
- queueManager.getJobQueueInfo("default").getSchedulingInfo();
- infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 22);
- assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
- assertEquals(infoStrings[8], "Running tasks: 1");
- assertEquals(infoStrings[9], "Active users:");
- assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
- assertEquals(infoStrings[15], "Running tasks: 1");
- assertEquals(infoStrings[16], "Active users:");
- assertEquals(infoStrings[17], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[20], "Number of Waiting Jobs: 4");
-
+ System.err.println(schedulingInfo);
+ assertEquals(schedulingInfo, 22, infoStrings.length);
+ assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[18], infoStrings[20], "Number of Waiting Jobs: 4");
+
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
- taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
- taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1j1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", u1j1);
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
@@ -1911,11 +1873,11 @@ public class TestCapacityScheduler exten
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
assertEquals(infoStrings.length, 18);
- assertEquals(infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
- assertEquals(infoStrings[8], "Running tasks: 0");
- assertEquals(infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
- assertEquals(infoStrings[13], "Running tasks: 0");
- assertEquals(infoStrings[16], "Number of Waiting Jobs: 4");
+ assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 0");
+ assertEquals(infoStrings[12], infoStrings[12], "Used capacity: 0 (0.0% of Capacity)");
+ assertEquals(infoStrings[13], infoStrings[13], "Running tasks: 0");
+ assertEquals(infoStrings[16], infoStrings[16], "Number of Waiting Jobs: 4");
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
@@ -1968,7 +1930,10 @@ public class TestCapacityScheduler exten
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
- t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0003_m_000001_0 on tt1",
+ "attempt_test_0003_r_000001_0 on tt1"});
FakeJobInProgress u1j3 = userJobs.get(2);
assertTrue("User Job 3 not running ",
u1j3.getStatus().getRunState() == JobStatus.RUNNING);
@@ -1981,12 +1946,14 @@ public class TestCapacityScheduler exten
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 20);
- assertEquals(infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
- assertEquals(infoStrings[8], "Running tasks: 1");
- assertEquals(infoStrings[9], "Active users:");
- assertEquals(infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
- assertEquals(infoStrings[18], "Number of Waiting Jobs: 1");
+ assertEquals(infoStrings.length, 22);
+ assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
+ assertEquals(infoStrings[9], infoStrings[9], "Active users:");
+ assertEquals(infoStrings[10], infoStrings[10], "User 'u1': 1 (100.0% of used capacity)");
+ assertEquals(infoStrings[14], infoStrings[14], "Used capacity: 1 (100.0% of Capacity)");
+ assertEquals(infoStrings[15], infoStrings[15], "Running tasks: 1");
+ assertEquals(infoStrings[18], infoStrings[20], "Number of Waiting Jobs: 1");
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
@@ -2036,8 +2003,9 @@ public class TestCapacityScheduler exten
// assert that all tasks are launched even though they transgress the
// scheduling limits.
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
}
/**
@@ -2050,7 +2018,10 @@ public class TestCapacityScheduler exten
throws IOException {
// 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(1, 2, 1);
+ final int NUM_MAP_SLOTS = 2;
+ final int NUM_REDUCE_SLOTS = 1;
+ taskTrackerManager =
+ new FakeTaskTrackerManager(1, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2101,18 +2072,15 @@ public class TestCapacityScheduler exten
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
// first, a map from j1 will run
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // Total 2 map slots should be accounted for.
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1"});
+ // Total 2 maps & 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
-
- // at this point, the scheduler tries to schedule another map from j1.
- // there isn't enough space. The second job's reduce should be scheduled.
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
100.0f);
- checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
}
/**
@@ -2133,7 +2101,10 @@ public class TestCapacityScheduler exten
throws IOException {
LOG.debug("Starting the scheduler.");
- taskTrackerManager = new FakeTaskTrackerManager(3, 2, 2);
+ final int NUM_MAP_SLOTS = 2;
+ final int NUM_REDUCE_SLOTS = 2;
+ taskTrackerManager =
+ new FakeTaskTrackerManager(3, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
@@ -2167,25 +2138,22 @@ public class TestCapacityScheduler exten
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
// Fill a tt with this job's tasks.
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // Total 1 map slot should be accounted for.
- checkOccupiedSlots("default", TaskType.MAP, 1, 1, 16.7f);
- assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
- (String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 0L);
-
- // same for reduces.
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 16.7f);
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_m_000002_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+ // Total 2 map slot and 1 reduce should be accounted for.
+ checkOccupiedSlots("default", TaskType.MAP, 1, 2, 33.33f);
assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 1, 1, 0),
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 2, 0, 1, 1, 0),
(String) job1.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-1);
// fill another TT with the rest of the tasks of the job
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {"attempt_test_0001_r_000002_0 on tt2"});
LOG.debug("Submit one high memory(2GB maps/reduces) job of "
+ "2 map, 2 reduce tasks.");
@@ -2200,19 +2168,19 @@ public class TestCapacityScheduler exten
// Have another TT run one task of each type of the high RAM
// job. This will fill up the TT.
- checkAssignment("tt3", "attempt_test_0002_m_000001_0 on tt3");
+ checkAssignments("tt3",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt3",
+ "attempt_test_0002_r_000001_0 on tt3"
+ });
+
checkOccupiedSlots("default", TaskType.MAP, 1, 4, 66.7f);
assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 0, 0, 0, 0),
- (String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 0L);
-
- checkAssignment("tt3", "attempt_test_0002_r_000001_0 on tt3");
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 4, 66.7f);
- assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 0, 1, 2, 0),
(String) job2.getSchedulingInfo());
- checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L);
+ checkMemReservedForTasksOnTT("tt3", 2 * 1024L, 2 * 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
+
LOG.debug("Submit one normal memory(1GB maps/reduces) job of "
+ "1 map, 1 reduce tasks.");
@@ -2227,23 +2195,30 @@ public class TestCapacityScheduler exten
// Send a TT with insufficient space for task assignment,
// This will cause a reservation for the high RAM job.
- assertNull(scheduler.assignTasks(tracker("tt1")));
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0002_m_000002_0 on tt2"
+ });
// reserved tasktrackers contribute to occupied slots for maps and reduces
checkOccupiedSlots("default", TaskType.MAP, 1, 6, 100.0f);
checkOccupiedSlots("default", TaskType.REDUCE, 1, 6, 100.0f);
- checkMemReservedForTasksOnTT("tt1", 1 * 1024L, 1 * 1024L);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 1 * 1024L,
+ 0, NUM_REDUCE_SLOTS-1);
LOG.info(job2.getSchedulingInfo());
assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 2),
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 0, 1, 2, 2),
(String) job2.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 0, 0, 0, 0),
(String) job3.getSchedulingInfo());
// Reservations are already done for job2. So job3 should go ahead.
- checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
- checkAssignment("tt2", "attempt_test_0003_r_000001_0 on tt2");
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0003_m_000001_0 on tt1",
+ "attempt_test_0003_r_000001_0 on tt1"});
}
/**
@@ -2256,7 +2231,10 @@ public class TestCapacityScheduler exten
public void testMemoryMatchingWithRetiredJobs() throws IOException {
// create a cluster with a single node.
LOG.debug("Starting cluster with 1 tasktracker, 2 map and 2 reduce slots");
- taskTrackerManager = new FakeTaskTrackerManager(1, 2, 2);
+ final int NUM_MAP_SLOTS = 2;
+ final int NUM_REDUCE_SLOTS = 2;
+ taskTrackerManager =
+ new FakeTaskTrackerManager(1, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
// create scheduler
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
@@ -2280,35 +2258,33 @@ public class TestCapacityScheduler exten
scheduler.start();
// submit a normal job
- LOG.debug("Submitting a normal job with 2 maps and 2 reduces");
+ LOG.debug("Submitting a normal job with 1 maps and 1 reduces");
JobConf jConf = new JobConf();
- jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(2);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(1);
jConf.setMemoryForMapTask(512);
jConf.setMemoryForReduceTask(512);
jConf.setQueueName("default");
jConf.setUser("u1");
+ jConf.setSpeculativeExecution(false);
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
- // 1st cycle - 1 map gets assigned.
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- // Total 1 map slot should be accounted for.
+ // 1st cycle - 1 map & 1 reduce gets assigned.
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
+ // Total 1 map & 1 reduce slot should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 0L);
-
- // 1st cycle of reduces - 1 reduce gets assigned.
- Task t1 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- // Total 1 reduce slot should be accounted for.
- checkOccupiedSlots("default", TaskType.REDUCE, 1, 1,
- 50.0f);
- checkMemReservedForTasksOnTT("tt1", 512L, 512L);
+ checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50.0f);
+ checkMemReservedForTasksOnTT("tt1", 512L, 512L,
+ NUM_MAP_SLOTS-1, NUM_REDUCE_SLOTS-1);
// kill this job !
taskTrackerManager.killJob(job1.getJobID());
// No more map/reduce slots should be accounted for.
checkOccupiedSlots("default", TaskType.MAP, 0, 0, 0.0f);
- checkOccupiedSlots("default", TaskType.REDUCE, 0, 0,
- 0.0f);
+ checkOccupiedSlots("default", TaskType.REDUCE, 0, 0, 0.0f);
// retire the job
taskTrackerManager.removeJob(job1.getJobID());
@@ -2326,32 +2302,32 @@ public class TestCapacityScheduler exten
// since with HADOOP-5964, we don't rely on a job conf to get
// the memory occupied, scheduling should be able to work correctly.
- t1 = checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1"
+ });
checkOccupiedSlots("default", TaskType.MAP, 1, 1, 50);
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
- // assign a reduce now.
- t1 = checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
checkOccupiedSlots("default", TaskType.REDUCE, 1, 1, 50);
- checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
// now, no more can be assigned because all the slots are blocked.
- assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
// finish the tasks on the tracker.
- taskTrackerManager.finishTask("tt1", t.getTaskID().toString(), job1);
- taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), job1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_r_000001_0", job1);
- // now a new task can be assigned.
- t = checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
+ // now new tasks can be assigned.
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0002_m_000002_0 on tt1",
+ "attempt_test_0002_r_000002_0 on tt1"});
checkOccupiedSlots("default", TaskType.MAP, 1, 2, 100.0f);
- // memory used will change because of the finished task above.
- checkMemReservedForTasksOnTT("tt1", 1024L, 512L);
-
- // reduce can be assigned.
- t = checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
checkOccupiedSlots("default", TaskType.REDUCE, 1, 2, 100.0f);
- checkMemReservedForTasksOnTT("tt1", 1024L, 1024L);
+ checkMemReservedForTasksOnTT("tt1", 1024L, 1024L,
+ NUM_MAP_SLOTS-2, NUM_REDUCE_SLOTS-2);
}
/*
@@ -2433,14 +2409,20 @@ public class TestCapacityScheduler exten
raiseStatusChangeEvents(mgr);
// get some tasks assigned.
- Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- Task t3 = checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
- Task t4 = checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
- taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(0));
- taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(0));
- taskTrackerManager.finishTask("tt2", t3.getTaskID().toString(), u1Jobs.get(1));
- taskTrackerManager.finishTask("tt2", t4.getTaskID().toString(), u1Jobs.get(1));
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"
+ });
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2"
+ });
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1Jobs.get(0));
+ taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", u1Jobs.get(0));
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
+ taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
// as some jobs have running tasks, the poller will now
// pick up new jobs to initialize.
@@ -2461,10 +2443,13 @@ public class TestCapacityScheduler exten
initializedJobs.contains(u1Jobs.get(1).getJobID()));
// finish one more job
- t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
- t2 = checkAssignment("tt1", "attempt_test_0003_r_000001_0 on tt1");
- taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(2));
- taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(2));
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0003_m_000001_0 on tt1",
+ "attempt_test_0003_r_000001_0 on tt1"
+ });
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", u1Jobs.get(2));
+ taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_0", u1Jobs.get(2));
// no new jobs should be picked up, because max user limit
// is still 3.
@@ -2473,10 +2458,11 @@ public class TestCapacityScheduler exten
assertEquals(initializedJobs.size(), 5);
// run 1 more jobs..
- t1 = checkAssignment("tt1", "attempt_test_0004_m_000001_0 on tt1");
- t1 = checkAssignment("tt1", "attempt_test_0004_r_000001_0 on tt1");
- taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1Jobs.get(3));
- taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1Jobs.get(3));
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0004_m_000001_0 on tt1",
+ "attempt_test_0004_r_000001_0 on tt1"
+ });
// Now initialised jobs should contain user 4's job, as
// user 1's jobs are all done and the number of users is
@@ -2578,8 +2564,9 @@ public class TestCapacityScheduler exten
submitJob(JobStatus.PREP, 1, 1, "q1", "u1");
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(scheduler.jobQueuesManager, "q1");
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
}
public void testFailedJobInitalizations() throws Exception {
@@ -2637,8 +2624,10 @@ public class TestCapacityScheduler exten
mgr.getRunningJobQueue("default").contains(job));
// assign a task
- Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
controlledInitializationPoller.selectJobsToInitialize();
@@ -2724,14 +2713,18 @@ public class TestCapacityScheduler exten
controlledInitializationPoller.selectJobsToInitialize();
raiseStatusChangeEvents(mgr);
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1" ,
+ "attempt_test_0001_r_000001_0 on tt1"});
assertTrue("Pending maps of job1 greater than zero",
(fjob1.pendingMaps() == 0));
- checkAssignment("tt2", "attempt_test_0001_m_000001_1 on tt2");
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- assertTrue("Pending reduces of job2 greater than zero",
+ assertTrue("Pending reduces of job1 greater than zero",
(fjob1.pendingReduces() == 0));
- checkAssignment("tt2", "attempt_test_0001_r_000001_1 on tt2");
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0001_m_000001_1 on tt2",
+ "attempt_test_0001_r_000001_1 on tt2"});
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", fjob1);
taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000001_1", fjob1);
@@ -2739,8 +2732,10 @@ public class TestCapacityScheduler exten
taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_1", fjob1);
taskTrackerManager.finalizeJob(fjob1);
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_r_000001_0 on tt1"});
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", fjob2);
taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", fjob2);
taskTrackerManager.finalizeJob(fjob2);
@@ -2775,28 +2770,55 @@ public class TestCapacityScheduler exten
scheduler.start();
LOG.debug("Submit a regular memory(1GB vmem maps/reduces) job of "
- + "3 map/red tasks");
+ + "1 map & 0 red tasks");
JobConf jConf = new JobConf(conf);
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(3);
- jConf.setNumReduceTasks(3);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
-
- // assign one map task of job1 on all the TTs
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkAssignment("tt3", "attempt_test_0001_m_000003_0 on tt3");
scheduler.updateQSIInfoForTests();
-
LOG.info(job1.getSchedulingInfo());
assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 0, 0, 0),
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
(String) job1.getSchedulingInfo());
+ jConf = new JobConf(conf);
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job2.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
+ (String) job2.getSchedulingInfo());
+
+ jConf = new JobConf(conf);
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(1);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("default");
+ jConf.setUser("u1");
+ FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+ checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+ scheduler.updateQSIInfoForTests();
+ LOG.info(job3.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
+ (String) job3.getSchedulingInfo());
+
LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ "2 map tasks");
jConf.setMemoryForMapTask(2 * 1024);
@@ -2805,7 +2827,7 @@ public class TestCapacityScheduler exten
jConf.setNumReduceTasks(0);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job4= submitJobAndInit(JobStatus.PREP, jConf);
LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ "2 map/red tasks");
@@ -2816,50 +2838,56 @@ public class TestCapacityScheduler exten
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
jConf.setUser("u1");
- FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
+ FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
- // Job2, a high memory job cannot be accommodated on a any TT. But with each
+ // Job4, a high memory job cannot be accommodated on a any TT. But with each
// trip to the scheduler, each of the TT should be reserved by job2.
- assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
scheduler.updateQSIInfoForTests();
- LOG.info(job2.getSchedulingInfo());
+ LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 2, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job4.getSchedulingInfo());
- assertNull(scheduler.assignTasks(tracker("tt2")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
scheduler.updateQSIInfoForTests();
- LOG.info(job2.getSchedulingInfo());
+ LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job4.getSchedulingInfo());
- // Job2 has only 2 pending tasks. So no more reservations. Job3 should get
+ // Job4 has only 2 pending tasks. So no more reservations. Job5 should get
// slots on tt3. tt1 and tt2 should not be assigned any slots with the
// reservation stats intact.
- assertNull(scheduler.assignTasks(tracker("tt1")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
scheduler.updateQSIInfoForTests();
- LOG.info(job2.getSchedulingInfo());
+ LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job4.getSchedulingInfo());
- assertNull(scheduler.assignTasks(tracker("tt2")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
scheduler.updateQSIInfoForTests();
- LOG.info(job2.getSchedulingInfo());
+ LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job4.getSchedulingInfo());
- checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
+ checkAssignments("tt3",
+ new String[] {
+ "attempt_test_0005_m_000001_0 on tt3"});
scheduler.updateQSIInfoForTests();
- LOG.info(job2.getSchedulingInfo());
+ LOG.info(job5.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
+ (String) job5.getSchedulingInfo());
+
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
- (String) job2.getSchedulingInfo());
+ (String) job4.getSchedulingInfo());
// No more tasks there in job3 also
- assertNull(scheduler.assignTasks(tracker("tt3")));
+ assertEquals(0, scheduler.assignTasks(tracker("tt3")).size());
}
/**
@@ -2915,67 +2943,102 @@ public class TestCapacityScheduler exten
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
// Map 1 of high memory job
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ // Map 1 of normal job
+ // Map 1 of normal job
+ // Map 1 of normal job, since comparator won't change on equals
+ // Reduce 1 of high memory job
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0002_m_000002_0 on tt1",
+ "attempt_test_0002_m_000003_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"
+ });
checkQueuesOrder(qs, scheduler
.getOrderedQueues(TaskType.MAP));
-
- // Reduce 1 of high memory job
- checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
checkQueuesOrder(qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 0, 0, 0),
+ (String) job2.getSchedulingInfo());
- // Map 1 of normal job
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(TaskType.MAP));
// Reduce 1 of normal job
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
-
- // Map 2 of normal job
- checkAssignment("tt1", "attempt_test_0002_m_000002_0 on tt1");
- checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(TaskType.MAP));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 1, 1, 0),
+ (String) job2.getSchedulingInfo());
// Reduce 2 of normal job
checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
-
- // Now both the queues are equally served. But the comparator doesn't change
- // the order if queues are equally served.
-
- // Map 3 of normal job
- checkAssignment("tt2", "attempt_test_0002_m_000003_0 on tt2");
- checkQueuesOrder(reversedQs, scheduler
- .getOrderedQueues(TaskType.MAP));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 2, 2, 0),
+ (String) job2.getSchedulingInfo());
// Reduce 3 of normal job
- checkAssignment("tt2", "attempt_test_0002_r_000003_0 on tt2");
+ checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(3, 3, 0, 3, 3, 0),
+ (String) job2.getSchedulingInfo());
// Map 2 of high memory job
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkQueuesOrder(qs, scheduler
- .getOrderedQueues(TaskType.MAP));
-
- // Reduce 2 of high memory job
- checkAssignment("tt2", "attempt_test_0001_r_000002_0 on tt2");
- checkQueuesOrder(qs, scheduler
- .getOrderedQueues(TaskType.REDUCE));
-
// Map 4 of normal job
- checkAssignment("tt2", "attempt_test_0002_m_000004_0 on tt2");
+ // Map 4 of normal job
+ // Reduce 2 of high memory job
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0002_m_000004_0 on tt2",
+ "attempt_test_0002_m_000005_0 on tt2",
+ "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0002_m_000006_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"
+ });
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.MAP));
+ checkQueuesOrder(qs, scheduler
+ .getOrderedQueues(TaskType.REDUCE));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 3, 3, 0),
+ (String) job2.getSchedulingInfo());
// Reduce 4 of normal job
checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
+ scheduler.updateQSIInfoForTests();
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0),
+ (String) job1.getSchedulingInfo());
+ assertEquals(
+ CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 4, 4, 0),
+ (String) job2.getSchedulingInfo());
}
private void checkFailedInitializedJobMovement() throws IOException {
@@ -3073,6 +3136,30 @@ public class TestCapacityScheduler exten
return tasks.get(0);
}
+ protected String getAssignedTasks(List<Task> tasks) {
+ if (tasks.size() == 0) {
+ return "<empty>";
+ }
+ StringBuffer s = new StringBuffer(tasks.get(0).toString());
+ for (int i=1; i < tasks.size(); ++i) {
+ s.append(", ");
+ s.append(tasks.get(i).toString());
+ }
+ return s.toString();
+ }
+
+ protected List<Task> checkAssignments(String taskTrackerName,
+ String[] expectedTasks) throws IOException {
+ List<Task> tasks = scheduler.assignTasks(tracker(taskTrackerName));
+ assertNotNull(getAssignedTasks(tasks), tasks);
+ assertEquals(getAssignedTasks(tasks), expectedTasks.length, tasks.size());
+ for (int i=0; i < tasks.size(); ++i) {
+ assertEquals(getAssignedTasks(tasks) + " -> " + expectedTasks[i],
+ expectedTasks[i], tasks.get(i).toString());
+ }
+ return tasks;
+ }
+
/**
* Get the amount of memory that is reserved for tasks on the taskTracker and
* verify that it matches what is expected.
@@ -3082,15 +3169,16 @@ public class TestCapacityScheduler exten
* @param expectedMemForReducesOnTT
*/
private void checkMemReservedForTasksOnTT(String taskTracker,
- Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT) {
+ Long expectedMemForMapsOnTT, Long expectedMemForReducesOnTT,
+ int numAvailableMapSlots, int numAvailableReduceSlots) {
Long observedMemForMapsOnTT =
scheduler.memoryMatcher.getMemReservedForTasks(
tracker(taskTracker).getStatus(),
- TaskType.MAP);
+ TaskType.MAP, numAvailableMapSlots);
Long observedMemForReducesOnTT =
scheduler.memoryMatcher.getMemReservedForTasks(
tracker(taskTracker).getStatus(),
- TaskType.REDUCE);
+ TaskType.REDUCE, numAvailableReduceSlots);
if (expectedMemForMapsOnTT == null) {
assertEquals(observedMemForMapsOnTT, null);
} else {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077538&r1=1077537&r2=1077538&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:26:09 2011
@@ -32,6 +32,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.Vector;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -168,6 +169,19 @@ public class JobInProgress {
private final int anyCacheLevel;
/**
+ * Number of scheduling opportunities (heartbeats) given to this Job
+ */
+ private volatile long numSchedulingOpportunities;
+
+ static String LOCALITY_WAIT_FACTOR = "mapreduce.job.locality.wait.factor";
+ static final float DEFAULT_LOCALITY_WAIT_FACTOR = 1.0f;
+
+ /**
+ * Percentage of the cluster the job is willing to wait to get better locality
+ */
+ private float localityWaitFactor = 1.0f;
+
+ /**
* A special value indicating that
* {@link #findNewMapTask(TaskTrackerStatus, int, int, int, double)} should
* schedule any only off-switch and speculative map tasks for this job.
@@ -469,6 +483,7 @@ public class JobInProgress {
Map<Node, List<TaskInProgress>> cache =
new IdentityHashMap<Node, List<TaskInProgress>>(maxLevel);
+ Set<String> uniqueHosts = new TreeSet<String>();
for (int i = 0; i < splits.length; i++) {
String[] splitLocations = splits[i].getLocations();
if (splitLocations == null || splitLocations.length == 0) {
@@ -478,6 +493,7 @@ public class JobInProgress {
for(String host: splitLocations) {
Node node = jobtracker.resolveAndAddToTopology(host);
+ uniqueHosts.add(host);
LOG.info("tip:" + maps[i].getTIPId() + " has split on node:" + node);
for (int j = 0; j < maxLevel; j++) {
List<TaskInProgress> hostMaps = cache.get(node);
@@ -498,6 +514,16 @@ public class JobInProgress {
}
}
}
+
+ // Calibrate the localityWaitFactor - Do not override user intent!
+ if (localityWaitFactor == DEFAULT_LOCALITY_WAIT_FACTOR) {
+ float jobNodes = uniqueHosts.size();
+ float clusterNodes = jobtracker.getNumberOfUniqueHosts();
+
+ localityWaitFactor = Math.min(jobNodes/clusterNodes, localityWaitFactor);
+ LOG.info(jobId + " LOCALITY_WAIT_FACTOR=" + localityWaitFactor);
+ }
+
return cache;
}
@@ -640,6 +666,10 @@ public class JobInProgress {
}
LOG.info("Input size for job " + jobId + " = " + inputLength
+ ". Number of splits = " + splits.length);
+
+ // Set localityWaitFactor before creating cache
+ localityWaitFactor =
+ conf.getFloat(LOCALITY_WAIT_FACTOR, DEFAULT_LOCALITY_WAIT_FACTOR);
if (numMapTasks > 0) {
nonRunningMapCache = createCache(splits, maxLevel);
}
@@ -1194,6 +1224,8 @@ public class JobInProgress {
) throws IOException {
if (status.getRunState() != JobStatus.RUNNING) {
LOG.info("Cannot create task split for " + profile.getJobID());
+ try { throw new IOException("state = " + status.getRunState()); }
+ catch (IOException ioe) {ioe.printStackTrace();}
return null;
}
@@ -1206,6 +1238,7 @@ public class JobInProgress {
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+ resetSchedulingOpportunities();
}
return result;
@@ -1255,6 +1288,8 @@ public class JobInProgress {
throws IOException {
if (!tasksInited.get()) {
LOG.info("Cannot create task split for " + profile.getJobID());
+ try { throw new IOException("state = " + status.getRunState()); }
+ catch (IOException ioe) {ioe.printStackTrace();}
return null;
}
@@ -1267,6 +1302,7 @@ public class JobInProgress {
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+ resetSchedulingOpportunities();
}
return result;
@@ -1278,6 +1314,8 @@ public class JobInProgress {
throws IOException {
if (!tasksInited.get()) {
LOG.info("Cannot create task split for " + profile.getJobID());
+ try { throw new IOException("state = " + status.getRunState()); }
+ catch (IOException ioe) {ioe.printStackTrace();}
return null;
}
@@ -1290,11 +1328,47 @@ public class JobInProgress {
Task result = maps[target].getTaskToRun(tts.getTrackerName());
if (result != null) {
addRunningTaskToTIP(maps[target], result.getTaskID(), tts, true);
+ // DO NOT reset for off-switch!
}
return result;
}
+ public void schedulingOpportunity() {
+ ++numSchedulingOpportunities;
+ }
+
+ public void resetSchedulingOpportunities() {
+ numSchedulingOpportunities = 0;
+ }
+
+ public long getNumSchedulingOpportunities() {
+ return numSchedulingOpportunities;
+ }
+
+ private static final long OVERRIDE = 1000000;
+ public void overrideSchedulingOpportunities() {
+ numSchedulingOpportunities = OVERRIDE;
+ }
+
+ /**
+ * Check if we can schedule an off-switch task for this job.
+ * @param numTaskTrackers.
+ *
+ * We check the number of missed opportunities for the job.
+ * If it has 'waited' long enough we go ahead and schedule.
+ *
+ * @return <code>true</code> if we can schedule off-switch,
+ * <code>false</code> otherwise
+ */
+ public boolean scheduleOffSwitch(int numTaskTrackers) {
+ long missedTaskTrackers = getNumSchedulingOpportunities();
+ long requiredSlots =
+ Math.min((desiredMaps() - finishedMaps()), numTaskTrackers);
+
+ return (requiredSlots * localityWaitFactor) < missedTaskTrackers;
+ }
+
/**
* Return a CleanupTask, if appropriate, to run on the given tasktracker
*