You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 05:30:34 UTC
svn commit: r1077573 [3/3] - in
/hadoop/common/branches/branch-0.20-security-patches/src:
contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
mapred/org/apache/hadoop/mapred/
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 04:30:34 2011
@@ -170,6 +170,9 @@ public class TestCapacityScheduler exten
FakeTaskTrackerManager taskTrackerManager, String user,
JobTracker jt) {
super(jId, jobConf, jt);
+ if (user == null) {
+ user = "drwho";
+ }
this.taskTrackerManager = taskTrackerManager;
this.startTime = System.currentTimeMillis();
this.status.setJobPriority(JobPriority.NORMAL);
@@ -198,6 +201,12 @@ public class TestCapacityScheduler exten
}
@Override
+ public Task obtainNewNonLocalMapTask(final TaskTrackerStatus tts,
+ int clusterSize, int ignored) throws IOException {
+ return obtainNewMapTask(tts, clusterSize, ignored);
+ }
+
+ @Override
public Task obtainNewMapTask(final TaskTrackerStatus tts, int clusterSize,
int ignored) throws IOException {
boolean areAllMapsRunning = (mapTaskCtr == numMapTasks);
@@ -436,7 +445,7 @@ public class TestCapacityScheduler exten
int reduces = 0;
int maxMapTasksPerTracker = 2;
int maxReduceTasksPerTracker = 1;
- List<JobInProgressListener> listeners =
+ List<JobInProgressListener> mylisteners =
new ArrayList<JobInProgressListener>();
FakeQueueManager qm = new FakeQueueManager();
@@ -511,7 +520,7 @@ public class TestCapacityScheduler exten
JobStatus newStatus = (JobStatus)jip.getStatus().clone();
JobStatusChangeEvent event = new JobStatusChangeEvent(jip,
EventType.RUN_STATE_CHANGED, oldStatus, newStatus);
- for (JobInProgressListener listener : listeners) {
+ for (JobInProgressListener listener : mylisteners) {
listener.jobUpdated(event);
}
} catch (Exception ioe) {
@@ -542,16 +551,16 @@ public class TestCapacityScheduler exten
public void addJobInProgressListener(JobInProgressListener listener) {
- listeners.add(listener);
+ mylisteners.add(listener);
}
public void removeJobInProgressListener(JobInProgressListener listener) {
- listeners.remove(listener);
+ mylisteners.remove(listener);
}
public void submitJob(JobInProgress job) throws IOException {
jobs.put(job.getJobID(), job);
- for (JobInProgressListener listener : listeners) {
+ for (JobInProgressListener listener : mylisteners) {
listener.jobAdded(job);
}
}
@@ -612,7 +621,7 @@ public class TestCapacityScheduler exten
JobStatusChangeEvent event =
new JobStatusChangeEvent (fjob, EventType.RUN_STATE_CHANGED, oldStatus,
newStatus);
- for (JobInProgressListener listener : listeners) {
+ for (JobInProgressListener listener : mylisteners) {
listener.jobUpdated(event);
}
}
@@ -625,7 +634,7 @@ public class TestCapacityScheduler exten
JobStatusChangeEvent event =
new JobStatusChangeEvent (fjob, EventType.PRIORITY_CHANGED, oldStatus,
newStatus);
- for (JobInProgressListener listener : listeners) {
+ for (JobInProgressListener listener : mylisteners) {
listener.jobUpdated(event);
}
}
@@ -642,7 +651,7 @@ public class TestCapacityScheduler exten
JobStatusChangeEvent event =
new JobStatusChangeEvent (fjob, EventType.START_TIME_CHANGED, oldStatus,
newStatus);
- for (JobInProgressListener listener : listeners) {
+ for (JobInProgressListener listener : mylisteners) {
listener.jobUpdated(event);
}
}
@@ -849,9 +858,9 @@ public class TestCapacityScheduler exten
taskTrackerManager.initJob(fjob1);
// check if the jobs are missing from the waiting queue
- // The jobs are not removed from waiting queue until they are scheduled
+ // The jobs are not removed from waiting queue until they are scheduled
assertEquals("Waiting queue is garbled on job init", 2,
- scheduler.jobQueuesManager.getWaitingJobs("default")
+ scheduler.jobQueuesManager.getQueue("default").getWaitingJobs()
.size());
// test if changing the job priority/start-time works as expected in the
@@ -868,8 +877,9 @@ public class TestCapacityScheduler exten
// mark the job as complete
taskTrackerManager.finalizeJob(fjob1);
- Collection<JobInProgress> rqueue =
- scheduler.jobQueuesManager.getRunningJobQueue("default");
+ CapacitySchedulerQueue queue =
+ scheduler.jobQueuesManager.getQueue("default");
+ Collection<JobInProgress> rqueue = queue.getRunningJobs();
// check if the job is removed from the scheduler
assertFalse("Scheduler contains completed job",
@@ -886,13 +896,17 @@ public class TestCapacityScheduler exten
* @throws IOException
*/
public void testMaxCapacities() throws IOException {
+ System.err.println("testMaxCapacities");
this.setUp(4,1,1);
- taskTrackerManager.addQueues(new String[] {"default"});
+ taskTrackerManager.addQueues(new String[] {"default", "q2"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
-
+ queues.add(new FakeQueueInfo("q2", 75.0f, false, 1));
+
resConf.setFakeQueues(queues);
resConf.setMaxCapacity("default", 50.0f);
+ resConf.setUserLimitFactor("default", 2);
+
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -904,22 +918,22 @@ public class TestCapacityScheduler exten
//first call of assign task should give task from default queue.
//default uses 1 map and 1 reduce slots are used
- checkMultipleAssignment(
- "tt1", "attempt_test_0001_m_000001_0 on tt1",
- "attempt_test_0001_r_000001_0 on tt1");
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"});
//second call of assign task
//default uses 2 map and 2 reduce slots
- checkMultipleAssignment(
- "tt2", "attempt_test_0001_m_000002_0 on tt2",
- "attempt_test_0001_r_000002_0 on tt2");
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0001_m_000002_0 on tt2",
+ "attempt_test_0001_r_000002_0 on tt2"});
//Now we have reached the max capacity limit for default ,
//no further tasks would be assigned to this queue.
- checkMultipleAssignment(
- "tt3", null,
- null);
+ checkAssignments("tt3", new String[] {});
}
// test if the queue reflects the changes
@@ -976,18 +990,19 @@ public class TestCapacityScheduler exten
private JobInProgress[] getJobsInQueue(boolean waiting) {
Collection<JobInProgress> queue =
waiting
- ? scheduler.jobQueuesManager.getWaitingJobs("default")
- : scheduler.jobQueuesManager.getRunningJobQueue("default");
+ ? scheduler.jobQueuesManager.getQueue("default").getWaitingJobs()
+ : scheduler.jobQueuesManager.getQueue("default").getRunningJobs();
return queue.toArray(new JobInProgress[0]);
}
// tests if tasks can be assinged when there are multiple jobs from a same
// user
public void testJobFinished() throws Exception {
- taskTrackerManager.addQueues(new String[] {"default"});
+ taskTrackerManager.addQueues(new String[] {"default", "q2"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1083,6 +1098,8 @@ public class TestCapacityScheduler exten
queues.add(new FakeQueueInfo("q1", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
+ resConf.setUserLimitFactor("q1", 4);
+ resConf.setUserLimitFactor("q2", 4);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1159,8 +1176,8 @@ public class TestCapacityScheduler exten
submitJobs(1, 4, "default");
JobQueuesManager mgr = scheduler.jobQueuesManager;
-
- while(mgr.getWaitingJobs("default").size() < 4){
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
+ while(queue.getWaitingJobs().size() < 4){
Thread.sleep(1);
}
//Raise status change events for jobs submitted.
@@ -1174,46 +1191,24 @@ public class TestCapacityScheduler exten
subJobsList.get("u1").containsAll(jobs));
}
- //Basic test to test capacity allocation across the queues which have no
- //capacity configured.
-
- public void testCapacityAllocationToQueues() throws Exception {
- String[] qs = {"default","q1","q2","q3","q4"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default",25.0f,true,25));
- queues.add(new FakeQueueInfo("q1",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q2",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q3",-1.0f,true,25));
- queues.add(new FakeQueueInfo("q4",-1.0f,true,25));
- resConf.setFakeQueues(queues);
- scheduler.setResourceManagerConf(resConf);
- scheduler.start();
- assertEquals(18.75f, resConf.getCapacity("q1"));
- assertEquals(18.75f, resConf.getCapacity("q2"));
- assertEquals(18.75f, resConf.getCapacity("q3"));
- assertEquals(18.75f, resConf.getCapacity("q4"));
- }
-
public void testCapacityAllocFailureWithLowerMaxCapacity()
throws Exception {
String[] qs = {"default", "q1"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
- queues.add(new FakeQueueInfo("q1", -1.0f, true, 50));
+ queues.add(new FakeQueueInfo("q1", 50.0f, true, 50));
resConf.setFakeQueues(queues);
resConf.setMaxCapacity("q1", 40.0f);
scheduler.setResourceManagerConf(resConf);
+ boolean failed = false;
try {
scheduler.start();
fail("Scheduler start should fail ");
- } catch (IllegalStateException ise) {
- assertEquals(
- ise.getMessage(),
- " Allocated capacity of " + 50.0f + " to unconfigured queue " +
- "q1" + " is greater than maximum Capacity " + 40.0f);
+ } catch (IllegalArgumentException iae) {
+ failed = true;
}
+ assertTrue("Scheduler start didn't fail!", failed);
}
// Tests how capacity is computed and assignment of tasks done
@@ -1287,6 +1282,8 @@ public class TestCapacityScheduler exten
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
+ resConf.setUserLimitFactor("default", 4);
+ resConf.setUserLimitFactor("q2", 4);
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1323,10 +1320,12 @@ public class TestCapacityScheduler exten
taskTrackerManager =
new FakeTaskTrackerManager(2, NUM_MAP_SLOTS, NUM_REDUCE_SLOTS);
- taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+ taskTrackerManager.addQueues(new String[] { "defaultXYZ", "q2" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("defaultXYZ", 25.0f, true, 50));
+ queues.add(new FakeQueueInfo("q2", 75.0f, true, 50));
resConf.setFakeQueues(queues);
+ resConf.setUserLimitFactor("defaultXYZ", 2);
//defaultXYZ can go up to 2 map and 2 reduce slots
resConf.setMaxCapacity("defaultXYZ", 50.0f);
@@ -1418,10 +1417,11 @@ public class TestCapacityScheduler exten
public void testUserLimitsWithMaxCapacities() throws Exception {
setUp(2, 2, 2);
// set up some queues
- String[] qs = {"default"};
+ String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 50));
resConf.setFakeQueues(queues);
resConf.setMaxCapacity("default", 75.0f);
scheduler.setResourceManagerConf(resConf);
@@ -1520,6 +1520,8 @@ public class TestCapacityScheduler exten
// test user limits when a 2nd job is submitted much after first job
// and we need to wait for first job's task to complete
public void testUserLimits3() throws Exception {
+ System.err.println("testUserLimits3");
+
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
@@ -1535,39 +1537,37 @@ public class TestCapacityScheduler exten
// for queue 'q2', the capacity for maps is 2. Since we're the only user,
// we should get a task
checkAssignments("tt1",
- new String[] {"attempt_test_0001_m_000001_0 on tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
"attempt_test_0001_m_000002_0 on tt1",
"attempt_test_0001_r_000001_0 on tt1"});
- // we get two more maps from 'default queue'
- checkAssignments("tt2",
- new String[] {"attempt_test_0001_m_000003_0 on tt2",
- "attempt_test_0001_m_000004_0 on tt2",
- "attempt_test_0001_r_000002_0 on tt2"});
+ // No tasks assigned since u1 has hit user limits of 50% i.e. q2 capacity
+ checkAssignments("tt2", new String[] {});
// Submit another job, from a different user
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // one of the task finishes
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
-
// Now if I ask for a map task, it should come from the second job
// and reduce from job2
- checkAssignments("tt1",
- new String[] {"attempt_test_0002_m_000001_0 on tt1"});
-
- // another task from job1 finishes, another new map and reduce task to job2
- taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+ checkAssignments("tt2",
+ new String[] {
+ "attempt_test_0002_m_000001_0 on tt2",
+ "attempt_test_0002_m_000002_0 on tt2",
+ "attempt_test_0002_r_000001_0 on tt2",
+ });
- // job2 shud get the map slot & reduce
- checkAssignments("tt1", new String[] {"attempt_test_0002_m_000002_0 on tt1"});
+ // A task from job1 finishes
+ // job1 shud get the map slot since u1 has only 1 task running
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", j1);
+ checkAssignments("tt1", new String[] {"attempt_test_0001_m_000003_0 on tt1"});
// now we have equal number of tasks from each job. Whichever job's
// task finishes, that job gets a new task
- taskTrackerManager.finishTask("tt2", "attempt_test_0001_m_000003_0", j1);
- checkAssignments("tt2", new String[] {"attempt_test_0001_m_000005_0 on tt2"});
- taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", j2);
- checkAssignment("tt1", "attempt_test_0002_m_000003_0 on tt1");
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_m_000001_0", j2);
+ checkAssignments("tt2", new String[] {"attempt_test_0002_m_000003_0 on tt2"});
+ taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000002_0", j1);
+ checkAssignment("tt1", "attempt_test_0001_m_000004_0 on tt1");
}
// test user limits with many users, more slots
@@ -1764,6 +1764,8 @@ public class TestCapacityScheduler exten
queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
resConf.setFakeQueues(queues);
+ resConf.setMaxInitializedActiveTasksPerUser("default", 4); // 4 tasks max
+ resConf.setMaxInitializedActiveTasksPerUser("q2", 4); // 4 tasks max
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1847,11 +1849,10 @@ public class TestCapacityScheduler exten
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
- System.err.println(schedulingInfo);
assertEquals(schedulingInfo, 22, infoStrings.length);
assertEquals(infoStrings[7], infoStrings[7], "Used capacity: 1 (100.0% of Capacity)");
assertEquals(infoStrings[8], infoStrings[8], "Running tasks: 1");
@@ -1868,7 +1869,7 @@ public class TestCapacityScheduler exten
taskTrackerManager.finalizeJob(u1j1);
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1887,7 +1888,7 @@ public class TestCapacityScheduler exten
//Run initializer to clean up failed jobs
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1909,7 +1910,7 @@ public class TestCapacityScheduler exten
//run initializer to clean up failed job
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1942,7 +1943,7 @@ public class TestCapacityScheduler exten
//one. run the poller as it is responsible for waiting count
controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
infoStrings = schedulingInfo.split("\n");
@@ -1958,7 +1959,7 @@ public class TestCapacityScheduler exten
//Fail the executing job
taskTrackerManager.finalizeJob(u1j3, JobStatus.FAILED);
// make sure we update our stats
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
//Now running counts should become zero
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -2213,12 +2214,15 @@ public class TestCapacityScheduler exten
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 0, 0, 0, 0),
(String) job3.getSchedulingInfo());
- // Reservations are already done for job2. So job3 should go ahead.
+ // Reservations are already done for job2.
+ // So job3 should go ahead.
+ // However, it has hit the user limit of 6 for reduces (incl. the reserved
+ // slot), so we should only get a map.
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job1);
+ scheduler.updateQueueUsageForTests();
checkAssignments("tt1",
new String[] {
- "attempt_test_0003_m_000001_0 on tt1",
- "attempt_test_0003_r_000001_0 on tt1"});
+ "attempt_test_0003_m_000001_0 on tt1"});
}
/**
@@ -2332,23 +2336,25 @@ public class TestCapacityScheduler exten
/*
* Test cases for Job Initialization poller.
- */
-
- /*
+ *
* This test verifies that the correct number of jobs for
* correct number of users is initialized.
* It also verifies that as jobs of users complete, new jobs
* from the correct users are initialized.
*/
public void testJobInitialization() throws Exception {
+ System.err.println("testJobInitialization");
// set up the scheduler
String[] qs = { "default" };
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
+ controlledInitializationPoller.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
+ resConf.setMaxSystemJobs(6); // 6 jobs max
+ resConf.setMaxInitializedActiveTasksPerUser("default", 4); // 4 tasks max
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -2358,7 +2364,7 @@ public class TestCapacityScheduler exten
// submit 4 jobs each for 3 users.
HashMap<String, ArrayList<FakeJobInProgress>> userJobs = submitJobs(3,
4, "default");
-
+
// get the jobs submitted.
ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
@@ -2369,13 +2375,15 @@ public class TestCapacityScheduler exten
Set<JobID> initializedJobs = initPoller.getInitializedJobList();
// we should have 12 (3 x 4) jobs in the job queue
- assertEquals(mgr.getWaitingJobs("default").size(), 12);
-
+ assertEquals(mgr.getQueue("default").getWaitingJobs().size(), 12);
+
// run one poller iteration.
controlledInitializationPoller.selectJobsToInitialize();
+ System.err.println("3 TTM #listeners=" + taskTrackerManager.mylisteners.size());
+
// the poller should initialize 6 jobs
- // 3 users and 2 jobs from each
+ // 3 users and 2 jobs (with 2 tasks) from each
assertEquals(initializedJobs.size(), 6);
assertTrue("Initialized jobs didnt contain the user1 job 1",
@@ -2401,7 +2409,7 @@ public class TestCapacityScheduler exten
// since no jobs have started running, there should be no
// change to the initialized jobs.
assertEquals(initializedJobs.size(), 6);
- assertFalse("Initialized jobs contains user 4 jobs",
+ assertFalse("Initialized jobs doesn't contain user 4 jobs",
initializedJobs.contains(u4j1.getJobID()));
// This event simulates raising the event on completion of setup task
@@ -2422,15 +2430,28 @@ public class TestCapacityScheduler exten
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", u1Jobs.get(0));
taskTrackerManager.finishTask("tt1", "attempt_test_0002_m_000001_0", u1Jobs.get(0));
taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
- taskTrackerManager.finishTask("tt2", "attempt_test_0001_r_000001_0", u1Jobs.get(1));
+ taskTrackerManager.finishTask("tt2", "attempt_test_0002_r_000001_0", u1Jobs.get(1));
+
+ // as max running jobs is 6, still no more jobs can be inited
+ controlledInitializationPoller.selectJobsToInitialize();
- // as some jobs have running tasks, the poller will now
+ // count should be 4 since 2 jobs are running
+ assertEquals(initializedJobs.size(), 4);
+
+ // Job has completed
+ taskTrackerManager.finalizeJob(u1Jobs.get(0));
+ taskTrackerManager.finalizeJob(u1Jobs.get(1));
+
+ // count should now be 4 since we haven't called the poller yet
+ assertEquals(initializedJobs.size(), 4);
+
+ // as some jobs have completed, the poller will now
// pick up new jobs to initialize.
controlledInitializationPoller.selectJobsToInitialize();
// count should still be the same
assertEquals(initializedJobs.size(), 6);
-
+
// new jobs that have got into the list
assertTrue(initializedJobs.contains(u1Jobs.get(2).getJobID()));
assertTrue(initializedJobs.contains(u1Jobs.get(3).getJobID()));
@@ -2451,8 +2472,8 @@ public class TestCapacityScheduler exten
taskTrackerManager.finishTask("tt1", "attempt_test_0003_m_000001_0", u1Jobs.get(2));
taskTrackerManager.finishTask("tt1", "attempt_test_0003_r_000001_0", u1Jobs.get(2));
- // no new jobs should be picked up, because max user limit
- // is still 3.
+ // no new jobs should be picked up, because max running jobs is 6, job3
+ // hasn't been marked as 'complete' yet
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
@@ -2464,16 +2485,109 @@ public class TestCapacityScheduler exten
"attempt_test_0004_r_000001_0 on tt1"
});
+ // Finish job_0003
+ taskTrackerManager.finalizeJob(u1Jobs.get(2));
+
// Now initialised jobs should contain user 4's job, as
- // user 1's jobs are all done and the number of users is
- // below the limit
+ // user 1's jobs are all done u2 and u3 already have 6 active tasks
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
assertTrue(initializedJobs.contains(u4j1.getJobID()));
controlledInitializationPoller.stopRunning();
}
+
+ /**
+ * This testcase test limits on job-submission per-user and per-queue.
+ */
+ public void testJobSubmissionLimits() throws Exception {
+ System.err.println("testJobSubmissionLimits");
+
+ // set up the scheduler
+ String[] qs = {"default", "q2"};
+ taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
+ queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxInitializedActiveTasksPerUser("default", 4); // 4 tasks max
+ resConf.setMaxInitializedActiveTasksPerUser("q2", 4); // 4 tasks max
+ resConf.setInitToAcceptJobsFactor("default", 1);
+ resConf.setMaxSystemJobs(12); // max 12 running jobs in the system, hence
+
+ // In queue 'default'
+ // max (pending+running) jobs -> 12 * 1 * .5 = 6
+ // max jobs per user to init -> 12 * .5 * .5 = 2
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ JobQueuesManager mgr = scheduler.jobQueuesManager;
+ JobInitializationPoller initPoller = scheduler.getInitializationPoller();
+
+ // submit 2 jobs each for 3 users, the maximum possible to default
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+ submitJobs(3, 2, "default");
+
+ // get the jobs submitted.
+ ArrayList<FakeJobInProgress> u1Jobs = userJobs.get("u1");
+ ArrayList<FakeJobInProgress> u2Jobs = userJobs.get("u2");
+ ArrayList<FakeJobInProgress> u3Jobs = userJobs.get("u3");
+
+ // reference to the initializedJobs data structure
+ // changes are reflected in the set as they are made by the poller
+ Set<JobID> initializedJobs = initPoller.getInitializedJobList();
+
+ // we should have 6 jobs in the job queue
+ assertEquals(6, mgr.getQueue("default").getWaitingJobs().size());
+
+ // run one poller iteration.
+ controlledInitializationPoller.selectJobsToInitialize();
+
+ // the poller should initialize 6 jobs
+ // 3 users and 2 jobs (with 2 tasks) from each
+ assertEquals(initializedJobs.size(), 6);
+
+ // now submit one more job from another user, should fail since default's
+ // job submission capacity is full
+ boolean jobSubmissionFailed = false;
+ try {
+ FakeJobInProgress u4j1 =
+ submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+ } catch (IOException ioe) {
+ jobSubmissionFailed = true;
+ }
+ assertTrue("Job submission of 7th job to 'default' queue didn't fail!",
+ jobSubmissionFailed);
+
+ // fail some jobs to clear up quota
+ taskTrackerManager.finalizeJob(u2Jobs.get(0), JobStatus.FAILED);
+ taskTrackerManager.finalizeJob(u3Jobs.get(0), JobStatus.FAILED);
+
+ FakeJobInProgress u1j3 =
+ submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+
+ // run the poller again.
+ controlledInitializationPoller.selectJobsToInitialize();
+ // the poller should initialize 5 jobs
+ // 3 from u1 and one each from u2 and u3
+ assertEquals(initializedJobs.size(), 5);
+
+ // Should fail since u1 is already at limit of 3 jobs
+ jobSubmissionFailed = false;
+ try {
+ FakeJobInProgress u1j4 =
+ submitJob(JobStatus.PREP, 1, 1, "default", "u1");
+ } catch (IOException ioe) {
+ jobSubmissionFailed = true;
+ }
+
+ assertTrue("Job submission of 4th job of user 'u1' to queue 'default' " +
+ "didn't fail!", jobSubmissionFailed);
+ }
+
/*
* testHighPriorityJobInitialization() shows behaviour when high priority job
* is submitted into a queue and how initialisation happens for the same.
@@ -2484,24 +2598,28 @@ public class TestCapacityScheduler exten
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
+ resConf.setMaxSystemJobs(6); // 6 jobs max
+ resConf.setMaxInitializedActiveTasksPerUser("default", 4); // 4 tasks max
scheduler.setResourceManagerConf(resConf);
scheduler.start();
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
- // submit 3 jobs for 3 users
- submitJobs(3,3,"default");
+ // submit 3 jobs for 3 users, only 2 each should be inited since max active
+ // tasks per user is 4 and max jobs is 6
+ HashMap<String, ArrayList<FakeJobInProgress>> userJobs =
+ submitJobs(3,3,"default");
controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobsList.size(), 6);
// submit 2 job for a different user. one of them will be made high priority
FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
- FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
+ FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 2, 2, "default", "u4");
controlledInitializationPoller.selectJobsToInitialize();
- // shouldn't change
+ // shouldn't change since max jobs is 6
assertEquals(initializedJobsList.size(), 6);
assertFalse("Contains U4J1 high priority job " ,
@@ -2510,18 +2628,31 @@ public class TestCapacityScheduler exten
initializedJobsList.contains(u4j2.getJobID()));
// change priority of one job
- taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
+ System.err.println("changing prio");
+ taskTrackerManager.setPriority(u4j2, JobPriority.VERY_HIGH);
+
+ // Finish one of the inited jobs
+ // run 1 more jobs..
+ checkAssignments("tt1",
+ new String[] {
+ "attempt_test_0001_m_000001_0 on tt1",
+ "attempt_test_0002_m_000001_0 on tt1",
+ "attempt_test_0001_r_000001_0 on tt1"
+ });
+ // Finish job_0003
+ taskTrackerManager.finalizeJob(userJobs.get("u1").get(0));
+
controlledInitializationPoller.selectJobsToInitialize();
// the high priority job should get initialized, but not the
// low priority job from u4, as we have already exceeded the
// limit.
- assertEquals(initializedJobsList.size(), 7);
- assertTrue("Does not contain U4J1 high priority job " ,
- initializedJobsList.contains(u4j1.getJobID()));
- assertFalse("Contains U4J2 Normal priority job " ,
+ assertEquals(initializedJobsList.size(), 5);
+ assertTrue("Does not contain U4J2 high priority job " ,
initializedJobsList.contains(u4j2.getJobID()));
+ assertFalse("Contains U4J1 Normal priority job " ,
+ initializedJobsList.contains(u4j1.getJobID()));
controlledInitializationPoller.stopRunning();
}
@@ -2533,9 +2664,7 @@ public class TestCapacityScheduler exten
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
-
- JobQueuesManager mgr = scheduler.jobQueuesManager;
-
+
// check proper running job movement and completion
checkRunningJobMovementAndCompletion();
@@ -2588,19 +2717,20 @@ public class TestCapacityScheduler exten
job.getStatus().setRunState(JobStatus.PREP);
taskTrackerManager.submitJob(job);
//check if job is present in waiting list.
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
assertEquals("Waiting job list does not contain submitted job",
- 1, mgr.getWaitingJobCount("default"));
+ 1, queue.getNumWaitingJobs());
assertTrue("Waiting job does not contain submitted job",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
//initialization should fail now.
controlledInitializationPoller.selectJobsToInitialize();
//Check if the job has been properly cleaned up.
assertEquals("Waiting job list contains submitted job",
- 0, mgr.getWaitingJobCount("default"));
+ 0, queue.getNumWaitingJobs());
assertFalse("Waiting job contains submitted job",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
assertFalse("Waiting job contains submitted job",
- mgr.getRunningJobQueue("default").contains(job));
+ queue.getRunningJobs().contains(job));
}
private void checkRunningJobMovementAndCompletion() throws IOException {
@@ -2618,10 +2748,11 @@ public class TestCapacityScheduler exten
raiseStatusChangeEvents(mgr);
// it should be there in both the queues.
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
assertTrue("Job not present in Job Queue",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
assertTrue("Job not present in Running Queue",
- mgr.getRunningJobQueue("default").contains(job));
+ queue.getRunningJobs().contains(job));
// assign a task
checkAssignments("tt1",
@@ -2637,7 +2768,7 @@ public class TestCapacityScheduler exten
// the job should also be removed from the job queue as tasks
// are scheduled
assertFalse("Job present in Job Queue",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
// complete tasks and job
taskTrackerManager.finishTask("tt1", "attempt_test_0001_m_000001_0", job);
@@ -2646,7 +2777,7 @@ public class TestCapacityScheduler exten
// make sure it is removed from the run queue
assertFalse("Job present in running queue",
- mgr.getRunningJobQueue("default").contains(job));
+ queue.getRunningJobs().contains(job));
}
private void checkFailedRunningJobMovement() throws IOException {
@@ -2658,14 +2789,15 @@ public class TestCapacityScheduler exten
submitJobAndInit(JobStatus.RUNNING, 1, 1, "default", "u1");
//check if the job is present in running queue.
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
assertTrue("Running jobs list does not contain submitted job",
- mgr.getRunningJobQueue("default").contains(job));
+ queue.getRunningJobs().contains(job));
taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
//check if the job is properly removed from running queue.
assertFalse("Running jobs list does not contain submitted job",
- mgr.getRunningJobQueue("default").contains(job));
+ queue.getRunningJobs().contains(job));
}
@@ -2753,7 +2885,7 @@ public class TestCapacityScheduler exten
taskTrackerManager.addQueues(new String[] { "default" });
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 25));
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
resConf.setFakeQueues(queues);
scheduler.setTaskTrackerManager(taskTrackerManager);
// enabled memory-based scheduling
@@ -2781,7 +2913,7 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job1.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
@@ -2797,7 +2929,7 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job2.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
@@ -2813,7 +2945,7 @@ public class TestCapacityScheduler exten
jConf.setUser("u1");
FakeJobInProgress job3 = submitJobAndInit(JobStatus.PREP, jConf);
checkAssignment("tt3", "attempt_test_0003_m_000001_0 on tt3");
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job3.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
@@ -2837,20 +2969,20 @@ public class TestCapacityScheduler exten
jConf.setNumMapTasks(2);
jConf.setNumReduceTasks(2);
jConf.setQueueName("default");
- jConf.setUser("u1");
+ jConf.setUser("u2");
FakeJobInProgress job5 = submitJobAndInit(JobStatus.PREP, jConf);
// Job4, a high memory job cannot be accommodated on a any TT. But with each
// trip to the scheduler, each of the TT should be reserved by job2.
assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 2, 0, 0, 0),
(String) job4.getSchedulingInfo());
assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
@@ -2860,14 +2992,14 @@ public class TestCapacityScheduler exten
// slots on tt3. tt1 and tt2 should not be assigned any slots with the
// reservation stats intact.
assertEquals(0, scheduler.assignTasks(tracker("tt1")).size());
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
(String) job4.getSchedulingInfo());
assertEquals(0, scheduler.assignTasks(tracker("tt2")).size());
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job4.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(0, 0, 4, 0, 0, 0),
@@ -2876,7 +3008,7 @@ public class TestCapacityScheduler exten
checkAssignments("tt3",
new String[] {
"attempt_test_0005_m_000001_0 on tt3"});
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
LOG.info(job5.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 1, 0, 0, 0, 0),
@@ -2938,7 +3070,7 @@ public class TestCapacityScheduler exten
jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(6);
jConf.setNumReduceTasks(6);
- jConf.setUser("u1");
+ jConf.setUser("u2");
jConf.setQueueName("q1");
FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
@@ -2959,7 +3091,7 @@ public class TestCapacityScheduler exten
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
(String) job1.getSchedulingInfo());
@@ -2972,7 +3104,7 @@ public class TestCapacityScheduler exten
checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
(String) job1.getSchedulingInfo());
@@ -2984,7 +3116,7 @@ public class TestCapacityScheduler exten
checkAssignment("tt1", "attempt_test_0002_r_000002_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
(String) job1.getSchedulingInfo());
@@ -2996,7 +3128,7 @@ public class TestCapacityScheduler exten
checkAssignment("tt1", "attempt_test_0002_r_000003_0 on tt1");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(1, 2, 2, 1, 2, 0),
(String) job1.getSchedulingInfo());
@@ -3020,9 +3152,9 @@ public class TestCapacityScheduler exten
.getOrderedQueues(TaskType.MAP));
checkQueuesOrder(qs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
- assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0),
+ scheduler.updateQueueUsageForTests();
+ assertEquals( // user limit is 6
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 2, 2, 4, 0),
(String) job1.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 3, 3, 0),
@@ -3032,9 +3164,9 @@ public class TestCapacityScheduler exten
checkAssignment("tt2", "attempt_test_0002_r_000004_0 on tt2");
checkQueuesOrder(reversedQs, scheduler
.getOrderedQueues(TaskType.REDUCE));
- scheduler.updateQSIInfoForTests();
- assertEquals(
- CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 4, 2, 4, 0),
+ scheduler.updateQueueUsageForTests();
+ assertEquals(// user limit is 6
+ CapacityTaskScheduler.getJobQueueSchedInfo(2, 4, 2, 2, 4, 0),
(String) job1.getSchedulingInfo());
assertEquals(
CapacityTaskScheduler.getJobQueueSchedInfo(6, 6, 0, 4, 4, 0),
@@ -3052,9 +3184,11 @@ public class TestCapacityScheduler exten
p.selectJobsToInitialize();
//Don't raise the status change event.
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
+
//check in waiting and initialized jobs list.
assertTrue("Waiting jobs list does not contain the job",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
assertTrue("Initialized job does not contain the job",
p.getInitializedJobList().contains(job.getJobID()));
@@ -3064,7 +3198,7 @@ public class TestCapacityScheduler exten
//Check if the job is present in waiting queue
assertFalse("Waiting jobs list contains failed job",
- mgr.getWaitingJobs("default").contains(job));
+ queue.getWaitingJobs().contains(job));
//run the poller to do the cleanup
p.selectJobsToInitialize();
@@ -3081,14 +3215,15 @@ public class TestCapacityScheduler exten
"u1");
// check in waiting and initialized jobs list.
- assertTrue("Waiting jobs list does not contain the job", mgr
- .getWaitingJobs("default").contains(job));
+ CapacitySchedulerQueue queue = mgr.getQueue("default");
+ assertTrue("Waiting jobs list does not contain the job",
+ queue.getWaitingJobs().contains(job));
// fail the waiting job
taskTrackerManager.finalizeJob(job, JobStatus.KILLED);
// Check if the job is present in waiting queue
- assertFalse("Waiting jobs list contains failed job", mgr
- .getWaitingJobs("default").contains(job));
+ assertFalse("Waiting jobs list contains failed job",
+ queue.getWaitingJobs().contains(job));
}
private void raiseStatusChangeEvents(JobQueuesManager mgr) {
@@ -3096,7 +3231,7 @@ public class TestCapacityScheduler exten
}
private void raiseStatusChangeEvents(JobQueuesManager mgr, String queueName) {
- Collection<JobInProgress> jips = mgr.getWaitingJobs(queueName);
+ Collection<JobInProgress> jips = mgr.getQueue(queueName).getWaitingJobs();
for(JobInProgress jip : jips) {
if(jip.getStatus().getRunState() == JobStatus.RUNNING) {
JobStatusChangeEvent evt = new JobStatusChangeEvent(jip,
@@ -3212,7 +3347,7 @@ public class TestCapacityScheduler exten
int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex
,int incrReduceIndex
) {
- scheduler.updateQSIInfoForTests();
+ scheduler.updateQueueUsageForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
String schedulingInfo =
queueManager.getJobQueueInfo(queue).getSchedulingInfo();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java Fri Mar 4 04:30:34 2011
@@ -60,7 +60,7 @@ public class TestCapacitySchedulerWithJo
.getTaskScheduler();
JobQueuesManager mgr = scheduler.jobQueuesManager;
assertEquals("Failed job present in Waiting queue", 0, mgr
- .getWaitingJobCount("default"));
+ .getQueue("default").getNumWaitingJobs());
}
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobInProgress.java Fri Mar 4 04:30:34 2011
@@ -803,6 +803,15 @@ public class JobInProgress {
return numReduceTasks - runningReduceTasks - failedReduceTIPs -
finishedReduceTasks + speculativeReduceTasks;
}
+
+ /**
+ * Return total number of map and reduce tasks desired by the job.
+ * @return total number of map and reduce tasks desired by the job
+ */
+ public int desiredTasks() {
+ return desiredMaps() + desiredReduces();
+ }
+
public int getNumSlotsPerTask(TaskType taskType) {
if (taskType == TaskType.MAP) {
return numSlotsPerMap;
@@ -1361,7 +1370,7 @@ public class JobInProgress {
/**
* Check if we can schedule an off-switch task for this job.
- * @param numTaskTrackers.
+ * @param numTaskTrackers number of tasktrackers
*
* We check the number of missed opportunities for the job.
* If it has 'waited' long enough we go ahead and schedule.
@@ -1559,7 +1568,7 @@ public class JobInProgress {
LOG.info("Exceeded limit for reduce input size: Estimated:" +
estimatedReduceInputSize + " Limit: " +
reduce_input_limit + " Failing Job " + jobId);
- status.setFailureInfo("Job Exceeded Reduce Input limit "
+ status.setFailureInfo("Job exceeded Reduce Input limit "
+ " Limit: " + reduce_input_limit +
" Estimated: " + estimatedReduceInputSize);
jobtracker.failJob(this);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobQueueJobInProgressListener.java Fri Mar 4 04:30:34 2011
@@ -54,6 +54,28 @@ class JobQueueJobInProgressListener exte
JobPriority getPriority() {return priority;}
long getStartTime() {return startTime;}
JobID getJobID() {return id;}
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null || obj.getClass() != JobSchedulingInfo.class) {
+ return false;
+ } else if (obj == this) {
+ return true;
+ }
+ else if (obj instanceof JobSchedulingInfo) {
+ JobSchedulingInfo that = (JobSchedulingInfo)obj;
+ return (this.id.equals(that.id) &&
+ this.startTime == that.startTime &&
+ this.priority == that.priority);
+ }
+ return false;
+ }
+
+ @Override
+ public int hashCode() {
+ return (int)(id.hashCode() * priority.hashCode() + startTime);
+ }
+
}
static final Comparator<JobSchedulingInfo> FIFO_JOB_QUEUE_COMPARATOR
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Mar 4 04:30:34 2011
@@ -3717,7 +3717,20 @@ public class JobTracker implements MRCon
jobInfo.write(out);
out.close();
}
- return addJob(jobId, job);
+
+ // Submit the job
+ JobStatus status;
+ try {
+ status = addJob(jobId, job);
+ } catch (IOException ioe) {
+ LOG.info("Job " + jobId + " submission failed!", ioe);
+ status = job.getStatus();
+ status.setFailureInfo(StringUtils.stringifyException(ioe));
+ failJob(job);
+ throw ioe;
+ }
+
+ return status;
}
}
@@ -3753,19 +3766,15 @@ public class JobTracker implements MRCon
* adding a job. This is the core job submission logic
* @param jobId The id for the job submitted which needs to be added
*/
- private synchronized JobStatus addJob(JobID jobId, JobInProgress job) {
+ private synchronized JobStatus addJob(JobID jobId, JobInProgress job)
+ throws IOException {
totalSubmissions++;
synchronized (jobs) {
synchronized (taskScheduler) {
jobs.put(job.getProfile().getJobID(), job);
for (JobInProgressListener listener : jobInProgressListeners) {
- try {
- listener.jobAdded(job);
- } catch (IOException ioe) {
- LOG.warn("Failed to add and so skipping the job : "
- + job.getJobID() + ". Exception : " + ioe);
- }
+ listener.jobAdded(job);
}
}
}
@@ -3946,8 +3955,8 @@ public class JobTracker implements MRCon
StringUtils.stringifyException(kie));
killJob(job);
} catch (Throwable t) {
- String failureInfo = "Job initialization failed:\n" +
- StringUtils.stringifyException(t);
+ String failureInfo =
+ "Job initialization failed:\n" + StringUtils.stringifyException(t);
// If the job initialization is failed, job state will be FAILED
LOG.error(failureInfo);
job.getStatus().setFailureInfo(failureInfo);
@@ -4889,7 +4898,12 @@ public class JobTracker implements MRCon
public void refreshQueues() throws IOException {
LOG.info("Refreshing queue information. requested by : " +
UserGroupInformation.getCurrentUser().getShortUserName());
- this.queueManager.refreshQueues(new Configuration(this.conf));
+ this.queueManager.refreshQueues(new Configuration());
+
+ synchronized (taskScheduler) {
+ taskScheduler.refresh();
+ }
+
}
synchronized String getReasonsForBlacklisting(String host) {
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/QueueManager.java Fri Mar 4 04:30:34 2011
@@ -28,6 +28,7 @@ import java.util.TreeSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.Queue.QueueState;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.util.StringUtils;
@@ -61,7 +62,8 @@ class QueueManager {
/** Whether ACLs are enabled in the system or not. */
private boolean aclsEnabled;
/** Map of a queue name and Queue object */
- final HashMap<String,Queue> queues;
+ final HashMap<String,Queue> queues = new HashMap<String,Queue>();
+
/**
* Enum representing an AccessControlList that drives set of operations that
* can be performed on a queue.
@@ -98,20 +100,24 @@ class QueueManager {
public QueueManager(Configuration conf) {
checkDeprecation(conf);
conf.addResource(QUEUE_ACLS_FILE_NAME);
- queues = new HashMap<String,Queue>();
+
+ // Get configured ACLs and state for each queue
+ aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
+
+ queues.putAll(parseQueues(conf));
+ }
+
+ synchronized private Map<String, Queue> parseQueues(Configuration conf) {
+ Map<String, Queue> queues = new HashMap<String, Queue>();
// First get the queue names
String[] queueNameValues = conf.getStrings("mapred.queue.names",
new String[]{JobConf.DEFAULT_QUEUE_NAME});
- // Get configured ACLs and state for each queue
- aclsEnabled = conf.getBoolean("mapred.acls.enabled", false);
for (String name : queueNameValues) {
- try {
- queues.put(name, new Queue(name, getQueueAcls(name, conf),
- getQueueState(name, conf)));
- } catch (Throwable t) {
- LOG.warn("Not able to initialize queue " + name, t);
- }
+ queues.put(name, new Queue(name, getQueueAcls(name, conf),
+ getQueueState(name, conf)));
}
+
+ return queues;
}
/**
@@ -218,6 +224,7 @@ class QueueManager {
* @throws IOException when queue ACL configuration file is invalid.
*/
synchronized void refreshQueues(Configuration conf) throws IOException {
+
// First check if things are configured in mapred-site.xml,
// so we can print out a deprecation warning.
// This check is needed only until we support the configuration
@@ -228,23 +235,36 @@ class QueueManager {
// will be overridden.
conf.addResource(QUEUE_ACLS_FILE_NAME);
+ // Now parse the queues and check to ensure no queue has been deleted
+ Map<String, Queue> newQueues = parseQueues(conf);
+ checkQueuesForDeletion(queues, newQueues);
+
// Now we refresh the properties of the queues. Note that we
// do *not* refresh the queue names or the acls flag. Instead
// we use the older values configured for them.
- LOG.info("Refreshing acls and state for configured queues.");
- try {
- for (String qName : getQueues()) {
- Queue q = queues.get(qName);
- q.setAcls(getQueueAcls(qName, conf));
- q.setState(getQueueState(qName, conf));
+ queues.clear();
+ queues.putAll(newQueues);
+ LOG.info("Queues acls, state and configs refreshed: " +
+ queues.size() + " queues present now.");
+ }
+
+ private void checkQueuesForDeletion(Map<String, Queue> currentQueues,
+ Map<String, Queue> newQueues) {
+ for (String queue : currentQueues.keySet()) {
+ if (!newQueues.containsKey(queue)) {
+ throw new IllegalArgumentException("Couldn't find queue '" + queue +
+ "' during refresh!");
+ }
+ }
+
+ // Mark new queues as STOPPED
+ for (String queue : newQueues.keySet()) {
+ if (!currentQueues.containsKey(queue)) {
+ newQueues.get(queue).setState(QueueState.STOPPED);
}
- } catch (Throwable t) {
- LOG.warn("Invalid queue configuration", t);
- throw new IOException("Invalid queue configuration", t);
}
-
}
-
+
private void checkDeprecation(Configuration conf) {
// check if queues are defined.
String[] queues = conf.getStrings("mapred.queue.names");
Modified: hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1077573&r1=1077572&r2=1077573&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Mar 4 04:30:34 2011
@@ -91,5 +91,10 @@ abstract class TaskScheduler implements
* @return
*/
public abstract Collection<JobInProgress> getJobs(String queueName);
-
+
+ /**
+ * Refresh the configuration of the scheduler.
+ */
+ public void refresh() throws IOException {}
+
}