You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/04/07 20:13:04 UTC
svn commit: r762885 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: szetszwo
Date: Tue Apr 7 18:13:03 2009
New Revision: 762885
URL: http://svn.apache.org/viewvc?rev=762885&view=rev
Log:
HADOOP-5068. Fix NPE in TestCapacityScheduler. (Vinod Kumar Vavilapalli via szetszwo)
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762885&r1=762884&r2=762885&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr 7 18:13:03 2009
@@ -1201,6 +1201,9 @@
HADOOP-3810. NameNode seems unstable on a cluster with little space left.
(hairong)
+ HADOOP-5068. Fix NPE in TestCapacityScheduler. (Vinod Kumar Vavilapalli
+ via szetszwo)
+
Release 0.19.2 - Unreleased
BUG FIXES
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=762885&r1=762884&r2=762885&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Apr 7 18:13:03 2009
@@ -143,7 +143,9 @@
}
}
}
-
+
+ private ControlledInitializationPoller controlledInitializationPoller;
+
static class FakeJobInProgress extends JobInProgress {
private FakeTaskTrackerManager taskTrackerManager;
@@ -605,8 +607,14 @@
scheduler.setTaskTrackerManager(taskTrackerManager);
conf = new JobConf();
+ // Don't let the JobInitializationPoller come in our way.
+ resConf = new FakeResourceManagerConf();
+ controlledInitializationPoller = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(controlledInitializationPoller);
scheduler.setConf(conf);
-
}
@Override
@@ -668,7 +676,6 @@
public void testJobRunStateChange() throws IOException {
// start the scheduler
taskTrackerManager.addQueues(new String[] {"default"});
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
resConf.setFakeQueues(queues);
@@ -806,7 +813,6 @@
public void testJobFinished() throws Exception {
taskTrackerManager.addQueues(new String[] {"default"});
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
resConf.setFakeQueues(queues);
@@ -862,7 +868,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -887,7 +892,6 @@
// need only one queue
String[] qs = { "default" };
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
resConf.setFakeQueues(queues);
@@ -918,7 +922,6 @@
public void testGCAllocationToQueues() throws Exception {
String[] qs = {"default","q1","q2","q3","q4"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
@@ -940,7 +943,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
// set the gc % as 10%, so that gc will be zero initially as
// the cluster capacity increase slowly.
@@ -995,7 +997,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1022,7 +1023,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1051,7 +1051,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1080,7 +1079,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1120,7 +1118,6 @@
// set up one queue, with 10 slots
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
resConf.setFakeQueues(queues);
@@ -1179,7 +1176,6 @@
// set up some queues
String[] qs = {"default", "q2", "q3"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
@@ -1221,7 +1217,6 @@
// set up some queues
String[] qs = {"default", "q2", "q3", "q4"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
@@ -1280,7 +1275,6 @@
taskTrackerManager.addQueues(qs);
int maxSlots = taskTrackerManager.maxMapTasksPerTracker
* taskTrackerManager.taskTrackers().size();
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
@@ -1315,7 +1309,6 @@
// set up some queues
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
@@ -1378,20 +1371,13 @@
public void testReclaimCapacityWithUninitializedJobs() throws IOException {
String[] qs = {"default", "q2"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
-
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
-
scheduler.setResourceManagerConf(resConf);
scheduler.start();
+
//Submit one job to the default queue and get the capacity over the
//gc of the particular queue.
FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
@@ -1424,7 +1410,6 @@
// set up some queues
String[] qs = {"default", "q2", "q3"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
// we want q3 to have 0 GC. Map slots = 4.
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
@@ -1501,18 +1486,13 @@
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
scheduler.start();
+
scheduler.assignTasks(tracker("tt1")); // heartbeat
scheduler.assignTasks(tracker("tt2")); // heartbeat
int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
@@ -1550,7 +1530,7 @@
assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
//Initalize the jobs but don't raise events
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
schedulingInfo =
queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -1567,7 +1547,7 @@
//assign one job
Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
//Initalize extra job.
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
//Get scheduling information, now the number of waiting job should have
//changed to 4 as one is scheduled and has become running.
@@ -1615,7 +1595,7 @@
u1j2.getStatus().getRunState() == JobStatus.RUNNING);
taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
//Run initializer to clean up failed jobs
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
schedulingInfo =
@@ -1633,7 +1613,7 @@
taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
//run initializer to clean up failed job
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
schedulingInfo =
@@ -1658,7 +1638,7 @@
//now the running count of map should be one and waiting jobs should be
//one. run the poller as it is responsible for waiting count
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// make sure we update our stats
scheduler.updateQSIInfoForTests();
schedulingInfo =
@@ -1702,7 +1682,6 @@
.setTotalPhysicalMemory(512 * 1024 * 1024L);
taskTrackerManager.addQueues(new String[] { "default" });
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
resConf.setFakeQueues(queues);
@@ -1749,7 +1728,6 @@
ttStatus.setReservedPhysicalMemory(0);
taskTrackerManager.addQueues(new String[] { "default" });
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
resConf.setFakeQueues(queues);
@@ -1799,7 +1777,6 @@
// Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
taskTrackerManager.addQueues(new String[] { "default" });
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
resConf.setFakeQueues(queues);
@@ -1885,7 +1862,6 @@
// Normal job on this TT would be 1GB vmem, 0.5GB pmem
taskTrackerManager.addQueues(new String[] { "default" });
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
resConf.setFakeQueues(queues);
@@ -1949,7 +1925,6 @@
ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
ttStatus.setReservedPhysicalMemory(0);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
taskTrackerManager.addQueues(new String[] { "default" });
@@ -1966,11 +1941,6 @@
resConf.setDefaultPercentOfPmemInVmem(33.3f);
resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
scheduler.setResourceManagerConf(resConf);
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
scheduler.start();
LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
@@ -2022,7 +1992,6 @@
ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
ttStatus.setReservedPhysicalMemory(0);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
taskTrackerManager.addQueues(new String[] { "default" });
@@ -2122,16 +2091,10 @@
taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
scheduler.setTaskTrackerManager(taskTrackerManager);
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2154,7 +2117,7 @@
assertEquals(mgr.getWaitingJobs("default").size(), 12);
// run one poller iteration.
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// the poller should initialize 6 jobs
// 3 users and 2 jobs from each
@@ -2178,7 +2141,7 @@
submitJob(JobStatus.PREP, 1, 1, "default", "u4");
// run the poller again.
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// since no jobs have started running, there should be no
// change to the initialized jobs.
@@ -2202,7 +2165,7 @@
// as some jobs have running tasks, the poller will now
// pick up new jobs to initialize.
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// count should still be the same
assertEquals(initializedJobs.size(), 6);
@@ -2226,7 +2189,7 @@
// no new jobs should be picked up, because max user limit
// is still 3.
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
@@ -2239,11 +2202,11 @@
// Now initialised jobs should contain user 4's job, as
// user 1's jobs are all done and the number of users is
// below the limit
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobs.size(), 5);
assertTrue(initializedJobs.contains(u4j1.getJobID()));
- p.stopRunning();
+ controlledInitializationPoller.stopRunning();
}
/*
@@ -2253,30 +2216,25 @@
public void testHighPriorityJobInitialization() throws Exception {
String[] qs = { "default"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
scheduler.start();
+
JobInitializationPoller initPoller = scheduler.getInitializationPoller();
Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
// submit 3 jobs for 3 users
submitJobs(3,3,"default");
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
assertEquals(initializedJobsList.size(), 6);
// submit 2 job for a different user. one of them will be made high priority
FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// shouldn't change
assertEquals(initializedJobsList.size(), 6);
@@ -2289,7 +2247,7 @@
// change priority of one job
taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// the high priority job should get initialized, but not the
// low priority job from u4, as we have already exceeded the
@@ -2299,22 +2257,16 @@
initializedJobsList.contains(u4j1.getJobID()));
assertFalse("Contains U4J2 Normal priority job " ,
initializedJobsList.contains(u4j2.getJobID()));
- p.stopRunning();
+ controlledInitializationPoller.stopRunning();
}
public void testJobMovement() throws Exception {
String[] qs = { "default"};
taskTrackerManager.addQueues(qs);
- resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
- ControlledInitializationPoller p = new ControlledInitializationPoller(
- scheduler.jobQueuesManager,
- resConf,
- resConf.getQueues());
- scheduler.setInitializationPoller(p);
scheduler.start();
JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2340,7 +2292,7 @@
// submit a job
FakeJobInProgress job =
submitJob(JobStatus.PREP, 1, 1, "default", "u1");
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
assertEquals(p.getInitializedJobList().size(), 1);
@@ -2357,7 +2309,7 @@
Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
- p.selectJobsToInitialize();
+ controlledInitializationPoller.selectJobsToInitialize();
// now this task should be removed from the initialized list.
assertTrue(p.getInitializedJobList().isEmpty());