You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/11/20 10:59:11 UTC

svn commit: r882472 - in /hadoop/mapreduce/branches/branch-0.21: ./ src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/ src/java/org/apache/hadoop/mapred/

Author: sharad
Date: Fri Nov 20 09:59:10 2009
New Revision: 882472

URL: http://svn.apache.org/viewvc?rev=882472&view=rev
Log:
MAPREDUCE-1007. Merge revision 882470 from trunk.

Modified:
    hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
    hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
    hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java

Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Nov 20 09:59:10 2009
@@ -823,3 +823,7 @@
     cdouglas)
 
     MAPREDUCE-915. The debug scripts are run as the job user. (ddas)
+
+    MAPREDUCE-1007. Fix NPE in CapacityTaskScheduler.getJobs(). 
+    (V.V.Chaitanya Krishna via sharad)
+

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Nov 20 09:59:10 2009
@@ -989,13 +989,17 @@
   @Override
   public synchronized Collection<JobInProgress> getJobs(String queueName) {
     Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
+    JobQueue jobQueue = jobQueuesManager.getJobQueue(queueName);
+    if (jobQueue == null) {
+      return jobCollection;
+    }
     Collection<JobInProgress> runningJobs =
-      jobQueuesManager.getJobQueue(queueName).getRunningJobs();
+      jobQueue.getRunningJobs();
     if (runningJobs != null) {
       jobCollection.addAll(runningJobs);
     }
     Collection<JobInProgress> waitingJobs = 
-      jobQueuesManager.getJobQueue(queueName).getWaitingJobs();
+      jobQueue.getWaitingJobs();
     Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
     if(waitingJobs != null) {
       tempCollection.addAll(waitingJobs);

Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Nov 20 09:59:10 2009
@@ -28,6 +28,7 @@
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import static org.apache.hadoop.mapred.CapacityTestUtils.*;
 
+import java.io.File;
 import java.io.IOException;
 import java.util.*;
 
@@ -36,6 +37,11 @@
   static final Log LOG =
     LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
 
+  String queueConfigPath =
+    System.getProperty("test.build.extraconf", "build/test/extraconf");
+  File queueConfigFile =
+    new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
+
   private static int jobCounter;
 
   private ControlledInitializationPoller controlledInitializationPoller;
@@ -341,65 +347,55 @@
     taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
   }
 
-  // basic tests, should be able to submit to queues
-  public void testSubmitToQueues() throws Exception {
-    // set up some queues
-    String[] qs = {"default", "q2"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
-    queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-
+  /**
+   * tests the submission of jobs to container and job queues
+   * @throws Exception
+   */
+  public void testJobSubmission() throws Exception {
+    JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
 
-    taskTrackerManager.setFakeQueues(queues);
+    queues[0].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+    queues[1].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+    queues[2].getProperties().setProperty(
+        CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+
+    // write the configuration file
+    QueueManagerTestUtils.writeQueueConfigurationFile(
+        queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+    setUp(1, 4, 4);
+    // use the queues from the config file.
+    taskTrackerManager.setQueueManager(new QueueManager());
     scheduler.start();
 
-    // submit a job with no queue specified. It should be accepted
-    // and given to the default queue. 
-    JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 
-                                                    10, 10, null, "u1");
-    // when we ask for tasks, we should get them for the job submitted
-    Map<String, String> expectedTaskStrings = new HashMap<String, String>();
-    expectedTaskStrings.put(CapacityTestUtils.MAP, 
-                            "attempt_test_0001_m_000001_0 on tt1");
-    expectedTaskStrings.put(CapacityTestUtils.REDUCE, 
-                            "attempt_test_0001_r_000001_0 on tt1");
-    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
-                                      "tt1", expectedTaskStrings);
-
-    // submit another job, to a different queue
-    j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
-    // now when we get tasks, it should be from the second job
-    expectedTaskStrings.clear();
-    expectedTaskStrings.put(CapacityTestUtils.MAP,
-                              "attempt_test_0002_m_000001_0 on tt2");
-    expectedTaskStrings.put(CapacityTestUtils.REDUCE,
-                              "attempt_test_0002_r_000001_0 on tt2");
-    checkMultipleTaskAssignment(taskTrackerManager, scheduler, 
-                                  "tt2", expectedTaskStrings);
-  }
-
-  public void testGetJobs() throws Exception {
-    // need only one queue
-    String[] qs = {"default"};
-    taskTrackerManager.addQueues(qs);
-    ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
-    queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-
+    // submit a job to the container queue
+    try {
+      taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0,
+          queues[0].getQueueName(), "user");
+      fail("Jobs are being able to be submitted to the container queue");
+    } catch (Exception e) {
+      assertTrue(scheduler.getJobs(queues[0].getQueueName()).isEmpty());
+    }
+
+    FakeJobInProgress job = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+        1, 0, queues[1].getQueueName(), "user");
+    assertEquals(1, scheduler.getJobs(queues[1].getQueueName()).size());
+    assertTrue(scheduler.getJobs(queues[1].getQueueName()).contains(job));
+
+    // check if the job is submitted
+    checkAssignment(taskTrackerManager, scheduler, "tt1", 
+    "attempt_test_0002_m_000001_0 on tt1");
 
-    taskTrackerManager.setFakeQueues(queues);
-    scheduler.start();
+    // test for getJobs
     HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
-      taskTrackerManager.submitJobs(1, 4, "default");
+      taskTrackerManager.submitJobs(1, 4, queues[2].getQueueName());
 
     JobQueuesManager mgr = scheduler.jobQueuesManager;
-
-    while (mgr.getJobQueue("default").getWaitingJobs().size() < 4) {
-      Thread.sleep(1);
-    }
     //Raise status change events for jobs submitted.
-    raiseStatusChangeEvents(mgr);
-    Collection<JobInProgress> jobs = scheduler.getJobs("default");
+    raiseStatusChangeEvents(mgr, queues[2].getQueueName());
+    Collection<JobInProgress> jobs =
+      scheduler.getJobs(queues[2].getQueueName());
 
     assertTrue(
       "Number of jobs returned by scheduler is wrong"

Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Nov 20 09:59:10 2009
@@ -3889,7 +3889,10 @@
 
   public org.apache.hadoop.mapreduce.JobStatus[] getJobsFromQueue(String queue) 
       throws IOException {
-    Collection<JobInProgress> jips = taskScheduler.getJobs(queue);
+    Collection<JobInProgress> jips = null;
+    if (queueManager.getLeafQueueNames().contains(queue)) {
+      jips = taskScheduler.getJobs(queue);
+    }
     return getJobStatus(jips,false);
   }