You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/05 09:34:23 UTC
svn commit: r771609 [2/2] - in /hadoop/core/branches/branch-0.20: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue May 5 07:34:23 2009
@@ -493,15 +493,12 @@
static class FakeQueueInfo {
String queueName;
float gc;
- int reclaimTimeLimit;
boolean supportsPrio;
int ulMin;
- public FakeQueueInfo(String queueName, float gc,
- int reclaimTimeLimit, boolean supportsPrio, int ulMin) {
+ public FakeQueueInfo(String queueName, float gc, boolean supportsPrio, int ulMin) {
this.queueName = queueName;
this.gc = gc;
- this.reclaimTimeLimit = reclaimTimeLimit;
this.supportsPrio = supportsPrio;
this.ulMin = ulMin;
}
@@ -514,7 +511,6 @@
new LinkedHashMap<String, FakeQueueInfo>();
String firstQueue;
- private long reclaimCapacityInterval = 1000;
void setFakeQueues(List<FakeQueueInfo> queues) {
for (FakeQueueInfo q: queues) {
@@ -531,17 +527,13 @@
return firstQueue;
}*/
- public float getGuaranteedCapacity(String queue) {
+ public float getCapacity(String queue) {
if(queueMap.get(queue).gc == -1) {
- return super.getGuaranteedCapacity(queue);
+ return super.getCapacity(queue);
}
return queueMap.get(queue).gc;
}
- public int getReclaimTimeLimit(String queue) {
- return queueMap.get(queue).reclaimTimeLimit;
- }
-
public int getMinimumUserLimitPercent(String queue) {
return queueMap.get(queue).ulMin;
}
@@ -559,16 +551,6 @@
public int getMaxWorkerThreads() {
return 1;
}
-
- @Override
- public long getReclaimCapacityInterval() {
- return reclaimCapacityInterval ;
- }
-
- @Override
- public void setReclaimCapacityInterval(long value) {
- this.reclaimCapacityInterval = value;
- }
}
protected class FakeClock extends CapacityTaskScheduler.Clock {
@@ -677,7 +659,7 @@
// start the scheduler
taskTrackerManager.addQueues(new String[] {"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 1));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -814,7 +796,7 @@
taskTrackerManager.addQueues(new String[] {"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -869,8 +851,8 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -893,7 +875,7 @@
String[] qs = { "default" };
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -916,29 +898,29 @@
subJobsList.get("u1").containsAll(jobs));
}
- //Basic test to test GC allocation across the queues which have no
- //GC configured.
+ //Basic test to test capacity allocation across the queues which have no
+ //capacity configured.
- public void testGCAllocationToQueues() throws Exception {
+ public void testCapacityAllocationToQueues() throws Exception {
String[] qs = {"default","q1","q2","q3","q4"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
- queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
- queues.add(new FakeQueueInfo("q2",-1.0f,5000,true,25));
- queues.add(new FakeQueueInfo("q3",-1.0f,5000,true,25));
- queues.add(new FakeQueueInfo("q4",-1.0f,5000,true,25));
+ queues.add(new FakeQueueInfo("default",25.0f,true,25));
+ queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
+ queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
+ queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
+ queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
- assertEquals(18.75f, resConf.getGuaranteedCapacity("q1"));
- assertEquals(18.75f, resConf.getGuaranteedCapacity("q2"));
- assertEquals(18.75f, resConf.getGuaranteedCapacity("q3"));
- assertEquals(18.75f, resConf.getGuaranteedCapacity("q4"));
+ assertEquals(18.75f, resConf.getCapacity("q1"));
+ assertEquals(18.75f, resConf.getCapacity("q2"));
+ assertEquals(18.75f, resConf.getCapacity("q3"));
+ assertEquals(18.75f, resConf.getCapacity("q4"));
}
- // Tests how GC is computed and assignment of tasks done
- // on the basis of the GC.
+ // Tests how capacity is computed and assignment of tasks done
+ // on the basis of the capacity.
public void testCapacityBasedAllocation() throws Exception {
// set up some queues
String[] qs = {"default", "q2"};
@@ -946,8 +928,8 @@
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
// set the gc % as 10%, so that gc will be zero initially as
// the cluster capacity increase slowly.
- queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 10.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 90.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -960,35 +942,35 @@
// job from q2 runs first because it has some non-zero capacity.
checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- verifyGuaranteedCapacity("0", "default");
- verifyGuaranteedCapacity("3", "q2");
+ verifyCapacity("0", "default");
+ verifyCapacity("3", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt3");
checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
- verifyGuaranteedCapacity("0", "default");
- verifyGuaranteedCapacity("5", "q2");
+ verifyCapacity("0", "default");
+ verifyCapacity("5", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt4");
checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
- verifyGuaranteedCapacity("0", "default");
- verifyGuaranteedCapacity("7", "q2");
+ verifyCapacity("0", "default");
+ verifyCapacity("7", "q2");
// add another tt to increase tt slots
taskTrackerManager.addTaskTracker("tt5");
// now job from default should run, as it is furthest away
// in terms of runningMaps / gc.
checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
- verifyGuaranteedCapacity("1", "default");
- verifyGuaranteedCapacity("9", "q2");
+ verifyCapacity("1", "default");
+ verifyCapacity("9", "q2");
}
- private void verifyGuaranteedCapacity(String expectedCapacity,
+ private void verifyCapacity(String expectedCapacity,
String queue) throws IOException {
String schedInfo = taskTrackerManager.getQueueManager().
- getSchedulerInfo(queue).toString();
- assertTrue(schedInfo.contains("Map tasks\nGuaranteed Capacity: "
+ getSchedulerInfo(queue).toString();
+ assertTrue(schedInfo.contains("Map tasks\nCapacity: "
+ expectedCapacity));
}
@@ -998,15 +980,15 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// I should get another map task.
@@ -1024,15 +1006,15 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// Submit another job, from a different user
@@ -1052,15 +1034,15 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
// submit a job
submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
@@ -1080,15 +1062,15 @@
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
// submit a job
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // for queue 'q2', the GC for maps is 2. Since we're the only user,
+ // for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
// since we're the only job, we get another map
@@ -1119,7 +1101,7 @@
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1170,283 +1152,7 @@
// first in the queue
checkAssignment("tt4", "attempt_test_0001_m_000007_0 on tt4");
}
-
- // test code to reclaim capacity
- public void testReclaimCapacity() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2", "q3"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
- resConf.setFakeQueues(queues);
- resConf.setReclaimCapacityInterval(500);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- // set up a situation where q2 is under capacity, and default & q3
- // are at/over capacity
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- // now submit a job to q2
- FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // get scheduler to notice that q2 needs to reclaim
- scheduler.reclaimCapacity();
- // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
- // we start reclaiming when 15 secs are left.
- clock.advance(400000);
- scheduler.reclaimCapacity();
- // no tasks should have been killed yet
- assertEquals(j1.runningMapTasks, 3);
- assertEquals(j2.runningMapTasks, 1);
- clock.advance(200000);
- scheduler.reclaimCapacity();
- // task from j1 will be killed
- assertEquals(j1.runningMapTasks, 2);
- assertEquals(j2.runningMapTasks, 1);
-
- }
-
- // test code to reclaim multiple capacity
- public void testReclaimCapacity2() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2", "q3", "q4"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
- resConf.setFakeQueues(queues);
- resConf.setReclaimCapacityInterval(500);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- // add some more TTs so our total map capacity is 10
- taskTrackerManager.addTaskTracker("tt3");
- taskTrackerManager.addTaskTracker("tt4");
- taskTrackerManager.addTaskTracker("tt5");
-
- // q2 has nothing running, default is under cap, q3 and q4 are over cap
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 2, 2, null, "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
- checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
- FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q4", "u1");
- checkAssignment("tt2", "attempt_test_0003_m_000001_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkAssignment("tt3", "attempt_test_0002_m_000002_0 on tt3");
- checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
- checkAssignment("tt4", "attempt_test_0003_m_000002_0 on tt4");
- checkAssignment("tt4", "attempt_test_0002_m_000004_0 on tt4");
- checkAssignment("tt5", "attempt_test_0002_m_000005_0 on tt5");
- checkAssignment("tt5", "attempt_test_0003_m_000003_0 on tt5");
- // at this point, q3 is running 5 tasks (with a cap of 2), q4 is
- // running 3 tasks (with a cap of 1).
- // If we submit a job to 'default', we need to get 3 slots back.
- FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- // get scheduler to notice that q2 needs to reclaim
- scheduler.reclaimCapacity();
- // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
- // we start reclaiming when 15 secs are left.
- clock.advance(400000);
- scheduler.reclaimCapacity();
- // nothing should have happened
- assertEquals(j2.runningMapTasks, 5);
- assertEquals(j3.runningMapTasks, 3);
- // 3 tasks to kill, 5 running over cap. q3 should give up 3*3/5 = 2 slots.
- // q4 should give up 2*3/5 = 1 slot.
- clock.advance(200000);
- scheduler.reclaimCapacity();
- assertEquals(j2.runningMapTasks, 3);
- assertEquals(j3.runningMapTasks, 2);
-
- }
-
- // test code to reclaim capacity when the cluster is completely occupied
- public void testReclaimCapacityWithFullCluster() throws Exception {
- // set up some queues
- String[] qs = {"default", "queue"};
- taskTrackerManager.addQueues(qs);
- int maxSlots = taskTrackerManager.maxMapTasksPerTracker
- * taskTrackerManager.taskTrackers().size();
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
- resConf.setFakeQueues(queues);
- resConf.setReclaimCapacityInterval(500);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- // now submit 1 job to queue "default" which should take up the cluster
- FakeJobInProgress j1 =
- submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
- checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-
- // now submit a job to queue "queue"
- submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
-
- scheduler.reclaimCapacity();
-
- clock.advance(scheduler.schedConf.getReclaimTimeLimit("default") * 1000);
-
- scheduler.reclaimCapacity();
-
- // check if the tasks are killed
- assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
- }
- // test code to reclaim capacity in steps
- public void testReclaimCapacityInSteps() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
- resConf.setFakeQueues(queues);
- resConf.setReclaimCapacityInterval(500);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- // set up a situation where q2 is under capacity, and default is
- // at/over capacity
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
- // now submit a job to q2
- FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
- // get scheduler to notice that q2 needs to reclaim
- scheduler.reclaimCapacity();
- // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
- // we start reclaiming when 15 secs are left.
- clock.advance(400000);
- // submit another job to q2 which causes more capacity to be reclaimed
- j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- clock.advance(200000);
- scheduler.reclaimCapacity();
- // one task from j1 will be killed
- assertEquals(j1.runningMapTasks, 3);
- clock.advance(300000);
- scheduler.reclaimCapacity();
- // timer for 2nd job hasn't fired, so nothing killed
- assertEquals(j1.runningMapTasks, 3);
- clock.advance(400000);
- scheduler.reclaimCapacity();
- // one task from j1 will be killed
- assertEquals(j1.runningMapTasks, 2);
-
- }
-
- /*
- * Test case for checking the reclaim capacity with uninitalized jobs.
- *
- * Configure 2 queue with capacity scheduler.
- *
- * Submit a single job to the default queue and make it go above the gc
- * of the queue.
- *
- * Then submit another job to the second queue but don't initialize it.
- *
- * Run reclaim capacity thread for the scheduler, in order to let scheduler
- * know that it has to reclaim capacity.
- *
- * Advance the scheduler clock by appropriate milliseconds.
- *
- * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
- *
- * Check running task count of the running job.
- *
- */
- public void testReclaimCapacityWithUninitializedJobs() throws IOException {
- String[] qs = {"default", "q2"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- //Submit one job to the default queue and get the capacity over the
- //gc of the particular queue.
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
-
- //Submit another job to the second queue but not initialize it.
- submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
-
- //call scheduler's reclaim capacity in order to start reclaim capacity
- //process.
- scheduler.reclaimCapacity();
- //advance the clock to the position when the two task of the job would
- //be killed.
- clock.advance(600000);
- //run reclaim capacity
- scheduler.reclaimCapacity();
- //check the count of the running tasks.
- assertEquals(j1.runningMapTasks, 2);
-
- }
-
- // test code to reclaim capacity with one queue haveing zero GC
- // (HADOOP-4988).
- // Simple test: reclaim capacity should work even if one of the
- // queues has a gc of 0.
- public void testReclaimCapacityWithZeroGC() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2", "q3"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- // we want q3 to have 0 GC. Map slots = 4.
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 40.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q3", 10.0f, 1000, true, 25));
- // note: because of the way we convert gc% into actual gc, q2's gc
- // will be 1, not 2.
- resConf.setFakeQueues(queues);
- resConf.setReclaimCapacityInterval(500);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
-
- // set up a situation where q2 is under capacity, and default
- // is over capacity
- FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- //FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q3", "u1");
- checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
- checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
- checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
- // now submit a job to q2
- FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // get scheduler to notice that q2 needs to reclaim
- scheduler.reclaimCapacity();
- // our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
- // we start reclaiming when 15 secs are left.
- clock.advance(400000);
- scheduler.reclaimCapacity();
- // no tasks should have been killed yet
- assertEquals(j1.runningMapTasks, 4);
- clock.advance(200000);
- scheduler.reclaimCapacity();
- // task from j1 will be killed
- assertEquals(j1.runningMapTasks, 3);
-
- }
-
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.
@@ -1470,9 +1176,9 @@
* - Now fail a job which has not been initialized at all.
* - Run the poller, so that it can clean up the job queue.
* - Check the count, the waiting job count should be 2.
- * - Now raise status change events to move the initialized jobs which
+ * - Now raise status change events to move the initialized jobs which
* should be two in count to running queue.
- * - Then schedule a map of the job in running queue.
+ * - Then schedule a map of the job in running queue.
* - Run the poller because the poller is responsible for waiting
* jobs count. Check the count, it should be using 100% map and one
* waiting job
@@ -1480,15 +1186,15 @@
* - Check the count, it should be now one waiting job and zero running
* tasks
*/
-
+
public void testSchedulingInformation() throws Exception {
String[] qs = {"default", "q2"};
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1501,167 +1207,164 @@
String schedulingInfo = queueManager.getJobQueueInfo("default").getSchedulingInfo();
String schedulingInfo2 = queueManager.getJobQueueInfo("q2").getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
-
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[1] , "Guaranteed Capacity Percentage: 50.0%");
- assertEquals(infoStrings[7] , "Guaranteed Capacity: " + totalMaps * 50/100);
- assertEquals(infoStrings[11] , "Guaranteed Capacity: " + totalReduces * 50/100);
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[1] , "Capacity Percentage: 50.0%");
+ assertEquals(infoStrings[6] , "Capacity: " + totalMaps * 50/100);
+ assertEquals(infoStrings[10] , "Capacity: " + totalReduces * 50/100);
assertEquals(infoStrings[2] , "User Limit: 25%");
- assertEquals(infoStrings[3] , "Reclaim Time limit: " +
- StringUtils.formatTime(1000000));
- assertEquals(infoStrings[4] , "Priority Supported: YES");
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 0");
- assertEquals(infoStrings[16] , "Number of users who have submitted jobs: 0");
+ assertEquals(infoStrings[3] , "Priority Supported: YES");
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 0");
+ assertEquals(infoStrings[15] , "Number of users who have submitted jobs: 0");
assertEquals(schedulingInfo, schedulingInfo2);
-
+
//Testing with actual job submission.
- ArrayList<FakeJobInProgress> userJobs =
+ ArrayList<FakeJobInProgress> userJobs =
submitJobs(1, 5, "default").get("u1");
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
-
+
//waiting job should be equal to number of jobs submitted.
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
-
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+
//Initalize the jobs but don't raise events
controlledInitializationPoller.selectJobsToInitialize();
-
- schedulingInfo =
+
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 17);
+ assertEquals(infoStrings.length, 16);
//should be previous value as nothing is scheduled because no events
//has been raised after initialization.
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
-
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 5");
+
//Raise status change event so that jobs can move to running queue.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
//assign one job
Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
//Initalize extra job.
controlledInitializationPoller.selectJobsToInitialize();
-
+
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 19);
- assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
- assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[17] , "Number of Waiting Jobs: 4");
-
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+ assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[16] , "Number of Waiting Jobs: 4");
+
//assign a reduce task
Task t2 = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 21);
- assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
- assertEquals(infoStrings[14],"Running tasks: 100.0% of Guaranteed Capacity");
- assertEquals(infoStrings[19] , "Number of Waiting Jobs: 4");
-
+ assertEquals(infoStrings.length, 20);
+ assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+ assertEquals(infoStrings[13],"Running tasks: 100.0% of Capacity");
+ assertEquals(infoStrings[18] , "Number of Waiting Jobs: 4");
+
//Complete the job and check the running tasks count
FakeJobInProgress u1j1 = userJobs.get(0);
taskTrackerManager.finishTask("tt1", t1.getTaskID().toString(), u1j1);
taskTrackerManager.finishTask("tt1", t2.getTaskID().toString(), u1j1);
taskTrackerManager.finalizeJob(u1j1);
-
+
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 4");
-
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 4");
+
//Fail a job which is initialized but not scheduled and check the count.
FakeJobInProgress u1j2 = userJobs.get(1);
- assertTrue("User1 job 2 not initalized ",
+ assertTrue("User1 job 2 not initalized ",
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
//Run initializer to clean up failed jobs
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 3");
-
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 3");
+
//Fail a job which is not initialized but is in the waiting queue.
FakeJobInProgress u1j5 = userJobs.get(4);
- assertFalse("User1 job 5 initalized ",
+ assertFalse("User1 job 5 initalized ",
u1j5.getStatus().getRunState() == JobStatus.RUNNING);
-
+
taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
//run initializer to clean up failed job
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 2");
-
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 2");
+
//Raise status change events as none of the intialized jobs would be
//in running queue as we just failed the second job which was initialized
//and completed the first one.
raiseStatusChangeEvents(scheduler.jobQueuesManager);
-
+
//Now schedule a map should be job3 of the user as job1 succeeded job2
//failed and now job3 is running
t1 = checkAssignment("tt1", "attempt_test_0003_m_000001_0 on tt1");
FakeJobInProgress u1j3 = userJobs.get(2);
- assertTrue("User Job 3 not running ",
+ assertTrue("User Job 3 not running ",
u1j3.getStatus().getRunState() == JobStatus.RUNNING);
-
+
//now the running count of map should be one and waiting jobs should be
//one. run the poller as it is responsible for waiting count
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 19);
- assertEquals(infoStrings[8], "Running tasks: 100.0% of Guaranteed Capacity");
- assertEquals(infoStrings[14],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[17] , "Number of Waiting Jobs: 1");
-
+ assertEquals(infoStrings.length, 18);
+ assertEquals(infoStrings[7], "Running tasks: 100.0% of Capacity");
+ assertEquals(infoStrings[13],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[16] , "Number of Waiting Jobs: 1");
+
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
// make sure we update our stats
scheduler.updateQSIInfoForTests();
//Now running counts should become zero
- schedulingInfo =
+ schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- assertEquals(infoStrings.length, 17);
- assertEquals(infoStrings[8], "Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[12],"Running tasks: 0.0% of Guaranteed Capacity");
- assertEquals(infoStrings[15] , "Number of Waiting Jobs: 1");
-
+ assertEquals(infoStrings.length, 16);
+ assertEquals(infoStrings[7], "Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[11],"Running tasks: 0.0% of Capacity");
+ assertEquals(infoStrings[14] , "Number of Waiting Jobs: 1");
+
}
/**
@@ -1683,7 +1386,7 @@
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1729,7 +1432,7 @@
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1778,7 +1481,7 @@
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
@@ -1863,7 +1566,7 @@
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
@@ -1926,7 +1629,7 @@
ttStatus.setReservedPhysicalMemory(0);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
taskTrackerManager.addQueues(new String[] { "default" });
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -1993,7 +1696,7 @@
ttStatus.setReservedPhysicalMemory(0);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
taskTrackerManager.addQueues(new String[] { "default" });
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -2092,7 +1795,7 @@
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -2217,7 +1920,7 @@
String[] qs = { "default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -2264,7 +1967,7 @@
String[] qs = { "default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Tue May 5 07:34:23 2009
@@ -49,13 +49,11 @@
public TestCapacitySchedulerConf() {
defaultProperties = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent",
"maximum-initialized-jobs-per-user"},
new String[] { "100",
- "300",
"false",
"100",
"2" }
@@ -85,26 +83,22 @@
public void testQueues() {
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent",
"maximum-initialized-jobs-per-user"},
new String[] { "10",
- "600",
"true",
"25",
"4"}
);
Map<String, String> q2Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent",
"maximum-initialized-jobs-per-user"},
new String[] { "100",
- "6000",
"false",
"50",
"1"}
@@ -126,7 +120,7 @@
public void testQueueWithDefaultProperties() {
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
+ new String[] { "capacity",
"minimum-user-limit-percent" },
new String[] { "20",
"75" }
@@ -143,7 +137,6 @@
for (String key : q1Props.keySet()) {
expProperties.put(key, q1Props.get(key));
}
- expProperties.put("reclaim-time-limit", "300");
expProperties.put("supports-priority", "false");
expProperties.put("maximum-initialized-jobs-per-user", "2");
queueDetails.put("default", expProperties);
@@ -156,23 +149,19 @@
// write new values to the file...
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "20.5",
- "600",
"true",
"40" }
);
Map<String, String> q2Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "100",
- "3000",
"false",
"50" }
);
@@ -198,23 +187,19 @@
endConfig();
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "-1",
- "800",
"true",
"50" }
);
Map<String, String> q2Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "-1",
- "800",
"true",
"50" }
);
@@ -235,18 +220,16 @@
startConfig();
writeUserDefinedDefaultConfiguration();
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "-1",
- "800",
"true",
"50" }
);
Map<String, String> q2Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "40",
@@ -254,12 +237,10 @@
"50" }
);
Map<String, String> q3Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
new String[] { "40",
- "500",
"true",
"50" }
);
@@ -269,7 +250,6 @@
testConf = new CapacitySchedulerConf(new Path(testConfFile));
Map<String, Map<String, String>> queueDetails
= new HashMap<String, Map<String,String>>();
- q2Props.put("reclaim-time-limit", "800");
queueDetails.put("default", q1Props);
queueDetails.put("production", q2Props);
queueDetails.put("test", q3Props);
@@ -280,12 +260,10 @@
openFile();
startConfig();
Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
+ new String[] { "capacity",
"supports-priority",
"minimum-user-limit-percent" },
- new String[] { "-1",
- "800",
+ new String[] { "-1",
"true",
"-50" }
);
@@ -299,29 +277,6 @@
assertTrue(true);
}
}
- public void testInvalidReclaimTimeLimit() throws IOException {
- openFile();
- startConfig();
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "-800",
- "true",
- "50" }
- );
- writeQueueDetails("default", q1Props);
- endConfig();
- try {
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
- testConf.getReclaimTimeLimit("default");
- fail("Expect Invalid reclaim time limit to raise Exception");
- }catch(IllegalArgumentException e) {
- assertTrue(true);
- }
- }
public void testInitializationPollerProperties()
throws Exception {
@@ -372,42 +327,16 @@
} catch (IllegalArgumentException e) {}
}
- public void testInvalidReclaimCapacityInterval() throws IOException {
- openFile();
- startConfig();
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "guaranteed-capacity",
- "reclaim-time-limit",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "-800",
- "true",
- "50" }
- );
- writeQueueDetails("default", q1Props);
- writeProperty("mapred.capacity-scheduler.reclaimCapacity.interval", "0");
- endConfig();
- try {
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
- testConf.getReclaimCapacityInterval();
- fail("Expect Invalid reclaim capacity interval raise Exception");
- }catch(IllegalArgumentException e) {
- assertTrue(true);
- }
- }
-
+
private void checkQueueProperties(
CapacitySchedulerConf testConf,
Map<String, Map<String, String>> queueDetails) {
for (String queueName : queueDetails.keySet()) {
Map<String, String> map = queueDetails.get(queueName);
- assertEquals(Float.parseFloat(map.get("guaranteed-capacity")),
- testConf.getGuaranteedCapacity(queueName));
+ assertEquals(Float.parseFloat(map.get("capacity")),
+ testConf.getCapacity(queueName));
assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
testConf.getMinimumUserLimitPercent(queueName));
- assertEquals(Integer.parseInt(map.get("reclaim-time-limit")),
- testConf.getReclaimTimeLimit(queueName));
assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
testConf.isPrioritySupported(queueName));
}
@@ -451,25 +380,21 @@
private void writeDefaultConfiguration() {
- writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit"
- , "300");
writeProperty("mapred.capacity-scheduler.default-supports-priority"
, "false");
writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
- , "100");
+ , "100");
}
-
-
+
+
private void writeUserDefinedDefaultConfiguration() {
- writeProperty("mapred.capacity-scheduler.default-reclaim-time-limit"
- , "800");
writeProperty("mapred.capacity-scheduler.default-supports-priority"
, "true");
writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
- , "50");
+ , "50");
}
-
-
+
+
private void writeProperty(String name, String value) {
writer.println("<property>");
writer.println("<name> " + name + "</name>");