You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sz...@apache.org on 2009/04/07 20:13:04 UTC

svn commit: r762885 - in /hadoop/core/trunk: CHANGES.txt src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Author: szetszwo
Date: Tue Apr  7 18:13:03 2009
New Revision: 762885

URL: http://svn.apache.org/viewvc?rev=762885&view=rev
Log:
HADOOP-5068. Fix NPE in TestCapacityScheduler.  (Vinod Kumar Vavilapalli via szetszwo)

Modified:
    hadoop/core/trunk/CHANGES.txt
    hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java

Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=762885&r1=762884&r2=762885&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Tue Apr  7 18:13:03 2009
@@ -1201,6 +1201,9 @@
     HADOOP-3810. NameNode seems unstable on a cluster with little space left.
     (hairong)
 
+    HADOOP-5068. Fix NPE in TestCapacityScheduler.  (Vinod Kumar Vavilapalli
+    via szetszwo)
+
 Release 0.19.2 - Unreleased
 
   BUG FIXES

Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=762885&r1=762884&r2=762885&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Tue Apr  7 18:13:03 2009
@@ -143,7 +143,9 @@
       }
     }
   }
-  
+
+  private ControlledInitializationPoller controlledInitializationPoller;
+
   static class FakeJobInProgress extends JobInProgress {
     
     private FakeTaskTrackerManager taskTrackerManager;
@@ -605,8 +607,14 @@
     scheduler.setTaskTrackerManager(taskTrackerManager);
 
     conf = new JobConf();
+    // Don't let the JobInitializationPoller come in our way.
+    resConf = new FakeResourceManagerConf();
+    controlledInitializationPoller = new ControlledInitializationPoller(
+        scheduler.jobQueuesManager,
+        resConf,
+        resConf.getQueues());
+    scheduler.setInitializationPoller(controlledInitializationPoller);
     scheduler.setConf(conf);
-    
   }
   
   @Override
@@ -668,7 +676,6 @@
   public void testJobRunStateChange() throws IOException {
     // start the scheduler
     taskTrackerManager.addQueues(new String[] {"default"});
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1, true, 1));
     resConf.setFakeQueues(queues);
@@ -806,7 +813,6 @@
   public void testJobFinished() throws Exception {
     taskTrackerManager.addQueues(new String[] {"default"});
     
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 10, true, 25));
     resConf.setFakeQueues(queues);
@@ -862,7 +868,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -887,7 +892,6 @@
     // need only one queue
     String[] qs = { "default" };
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 300, true, 100));
     resConf.setFakeQueues(queues);
@@ -918,7 +922,6 @@
   public void testGCAllocationToQueues() throws Exception {
     String[] qs = {"default","q1","q2","q3","q4"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default",25.0f,5000,true,25));
     queues.add(new FakeQueueInfo("q1",-1.0f,5000,true,25));
@@ -940,7 +943,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     // set the gc % as 10%, so that gc will be zero initially as 
     // the cluster capacity increase slowly.
@@ -995,7 +997,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1022,7 +1023,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1051,7 +1051,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1080,7 +1079,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 5000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 5000, true, 25));
@@ -1120,7 +1118,6 @@
     // set up one queue, with 10 slots
     String[] qs = {"default"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 10000, true, 25));
     resConf.setFakeQueues(queues);
@@ -1179,7 +1176,6 @@
     // set up some queues
     String[] qs = {"default", "q2", "q3"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
@@ -1221,7 +1217,6 @@
     // set up some queues
     String[] qs = {"default", "q2", "q3", "q4"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
@@ -1280,7 +1275,6 @@
     taskTrackerManager.addQueues(qs);
     int maxSlots = taskTrackerManager.maxMapTasksPerTracker 
                    * taskTrackerManager.taskTrackers().size();
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
@@ -1315,7 +1309,6 @@
     // set up some queues
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
@@ -1378,20 +1371,13 @@
   public void testReclaimCapacityWithUninitializedJobs() throws IOException {
     String[] qs = {"default", "q2"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
-    
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues());
-    scheduler.setInitializationPoller(p);
-    
     scheduler.setResourceManagerConf(resConf);
     scheduler.start();
+
     //Submit one job to the default queue and get the capacity over the 
     //gc of the particular queue.
     FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
@@ -1424,7 +1410,6 @@
     // set up some queues
     String[] qs = {"default", "q2", "q3"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     // we want q3 to have 0 GC. Map slots = 4. 
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
@@ -1501,18 +1486,13 @@
     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
     queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues());
-    scheduler.setInitializationPoller(p);
     scheduler.start();
+
     scheduler.assignTasks(tracker("tt1")); // heartbeat
     scheduler.assignTasks(tracker("tt2")); // heartbeat
     int totalMaps = taskTrackerManager.getClusterStatus().getMaxMapTasks();
@@ -1550,7 +1530,7 @@
     assertEquals(infoStrings[15] , "Number of Waiting Jobs: 5");
     
     //Initalize the jobs but don't raise events
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     schedulingInfo = 
       queueManager.getJobQueueInfo("default").getSchedulingInfo();
@@ -1567,7 +1547,7 @@
     //assign one job
     Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     //Initalize extra job.
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     //Get scheduling information, now the number of waiting job should have
     //changed to 4 as one is scheduled and has become running.
@@ -1615,7 +1595,7 @@
         u1j2.getStatus().getRunState() == JobStatus.RUNNING);
     taskTrackerManager.finalizeJob(u1j2, JobStatus.FAILED);
     //Run initializer to clean up failed jobs
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
     schedulingInfo = 
@@ -1633,7 +1613,7 @@
     
     taskTrackerManager.finalizeJob(u1j5, JobStatus.FAILED);
     //run initializer to clean up failed job
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
     schedulingInfo = 
@@ -1658,7 +1638,7 @@
     
     //now the running count of map should be one and waiting jobs should be
     //one. run the poller as it is responsible for waiting count
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     // make sure we update our stats
     scheduler.updateQSIInfoForTests();
     schedulingInfo = 
@@ -1702,7 +1682,6 @@
         .setTotalPhysicalMemory(512 * 1024 * 1024L);
 
     taskTrackerManager.addQueues(new String[] { "default" });
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     resConf.setFakeQueues(queues);
@@ -1749,7 +1728,6 @@
     ttStatus.setReservedPhysicalMemory(0);
 
     taskTrackerManager.addQueues(new String[] { "default" });
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     resConf.setFakeQueues(queues);
@@ -1799,7 +1777,6 @@
     // Normal job on this TT would be 1.5GB vmem, 0.5GB pmem
 
     taskTrackerManager.addQueues(new String[] { "default" });
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     resConf.setFakeQueues(queues);
@@ -1885,7 +1862,6 @@
     // Normal job on this TT would be 1GB vmem, 0.5GB pmem
 
     taskTrackerManager.addQueues(new String[] { "default" });
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     resConf.setFakeQueues(queues);
@@ -1949,7 +1925,6 @@
     ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024);
     ttStatus.setReservedPhysicalMemory(0);
 
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     taskTrackerManager.addQueues(new String[] { "default" });
@@ -1966,11 +1941,6 @@
     resConf.setDefaultPercentOfPmemInVmem(33.3f);
     resConf.setLimitMaxPmemForTasks(pmemUpperLimit);
     scheduler.setResourceManagerConf(resConf);
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues());
-    scheduler.setInitializationPoller(p);
     scheduler.start();
 
     LOG.debug("Submit one invalid high ram(5GB vmem, 3GB pmem) job of "
@@ -2022,7 +1992,6 @@
     ttStatus.setTotalPhysicalMemory(1 * 1024 * 1024 * 1024L);
     ttStatus.setReservedPhysicalMemory(0);
 
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 25));
     taskTrackerManager.addQueues(new String[] { "default" });
@@ -2122,16 +2091,10 @@
     taskTrackerManager = new FakeTaskTrackerManager(2, 1, 1);
     scheduler.setTaskTrackerManager(taskTrackerManager);
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-                                        scheduler.jobQueuesManager,
-                                        resConf,
-                                        resConf.getQueues());
-    scheduler.setInitializationPoller(p);
     scheduler.start();
   
     JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2154,7 +2117,7 @@
     assertEquals(mgr.getWaitingJobs("default").size(), 12);
 
     // run one poller iteration.
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     // the poller should initialize 6 jobs
     // 3 users and 2 jobs from each
@@ -2178,7 +2141,7 @@
       submitJob(JobStatus.PREP, 1, 1, "default", "u4");
 
     // run the poller again.
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     // since no jobs have started running, there should be no
     // change to the initialized jobs.
@@ -2202,7 +2165,7 @@
 
     // as some jobs have running tasks, the poller will now
     // pick up new jobs to initialize.
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
 
     // count should still be the same
     assertEquals(initializedJobs.size(), 6);
@@ -2226,7 +2189,7 @@
 
     // no new jobs should be picked up, because max user limit
     // is still 3.
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     assertEquals(initializedJobs.size(), 5);
     
@@ -2239,11 +2202,11 @@
     // Now initialised jobs should contain user 4's job, as
     // user 1's jobs are all done and the number of users is
     // below the limit
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     assertEquals(initializedJobs.size(), 5);
     assertTrue(initializedJobs.contains(u4j1.getJobID()));
     
-    p.stopRunning();
+    controlledInitializationPoller.stopRunning();
   }
 
   /*
@@ -2253,30 +2216,25 @@
   public void testHighPriorityJobInitialization() throws Exception {
     String[] qs = { "default"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-                                            scheduler.jobQueuesManager,
-                                            resConf,
-                                            resConf.getQueues());
-    scheduler.setInitializationPoller(p);
     scheduler.start();
+
     JobInitializationPoller initPoller = scheduler.getInitializationPoller();
     Set<JobID> initializedJobsList = initPoller.getInitializedJobList();
 
     // submit 3 jobs for 3 users
     submitJobs(3,3,"default");
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     assertEquals(initializedJobsList.size(), 6);
     
     // submit 2 job for a different user. one of them will be made high priority
     FakeJobInProgress u4j1 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
     FakeJobInProgress u4j2 = submitJob(JobStatus.PREP, 1, 1, "default", "u4");
     
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     // shouldn't change
     assertEquals(initializedJobsList.size(), 6);
@@ -2289,7 +2247,7 @@
     // change priority of one job
     taskTrackerManager.setPriority(u4j1, JobPriority.VERY_HIGH);
     
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     // the high priority job should get initialized, but not the
     // low priority job from u4, as we have already exceeded the
@@ -2299,22 +2257,16 @@
         initializedJobsList.contains(u4j1.getJobID()));
     assertFalse("Contains U4J2 Normal priority job " , 
         initializedJobsList.contains(u4j2.getJobID()));
-    p.stopRunning();
+    controlledInitializationPoller.stopRunning();
   }
   
   public void testJobMovement() throws Exception {
     String[] qs = { "default"};
     taskTrackerManager.addQueues(qs);
-    resConf = new FakeResourceManagerConf();
     ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
     queues.add(new FakeQueueInfo("default", 100.0f, 1000000, true, 100));
     resConf.setFakeQueues(queues);
     scheduler.setResourceManagerConf(resConf);
-    ControlledInitializationPoller p = new ControlledInitializationPoller(
-        scheduler.jobQueuesManager,
-        resConf,
-        resConf.getQueues());
-    scheduler.setInitializationPoller(p);
     scheduler.start();
     
     JobQueuesManager mgr = scheduler.jobQueuesManager;
@@ -2340,7 +2292,7 @@
     // submit a job
     FakeJobInProgress job = 
       submitJob(JobStatus.PREP, 1, 1, "default", "u1");
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     assertEquals(p.getInitializedJobList().size(), 1);
 
@@ -2357,7 +2309,7 @@
     Task t = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
     t = checkAssignment("tt1", "attempt_test_0001_r_000001_0 on tt1");
     
-    p.selectJobsToInitialize();
+    controlledInitializationPoller.selectJobsToInitialize();
     
     // now this task should be removed from the initialized list.
     assertTrue(p.getInitializedJobList().isEmpty());