You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by sh...@apache.org on 2009/11/20 10:59:11 UTC
svn commit: r882472 - in /hadoop/mapreduce/branches/branch-0.21: ./
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/java/org/apache/hadoop/mapred/
Author: sharad
Date: Fri Nov 20 09:59:10 2009
New Revision: 882472
URL: http://svn.apache.org/viewvc?rev=882472&view=rev
Log:
MAPREDUCE-1007. Merge revision 882470 from trunk.
Modified:
hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
Modified: hadoop/mapreduce/branches/branch-0.21/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/CHANGES.txt?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/CHANGES.txt (original)
+++ hadoop/mapreduce/branches/branch-0.21/CHANGES.txt Fri Nov 20 09:59:10 2009
@@ -823,3 +823,7 @@
cdouglas)
MAPREDUCE-915. The debug scripts are run as the job user. (ddas)
+
+ MAPREDUCE-1007. Fix NPE in CapacityTaskScheduler.getJobs().
+ (V.V.Chaitanya Krishna via sharad)
+
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Nov 20 09:59:10 2009
@@ -989,13 +989,17 @@
@Override
public synchronized Collection<JobInProgress> getJobs(String queueName) {
Collection<JobInProgress> jobCollection = new ArrayList<JobInProgress>();
+ JobQueue jobQueue = jobQueuesManager.getJobQueue(queueName);
+ if (jobQueue == null) {
+ return jobCollection;
+ }
Collection<JobInProgress> runningJobs =
- jobQueuesManager.getJobQueue(queueName).getRunningJobs();
+ jobQueue.getRunningJobs();
if (runningJobs != null) {
jobCollection.addAll(runningJobs);
}
Collection<JobInProgress> waitingJobs =
- jobQueuesManager.getJobQueue(queueName).getWaitingJobs();
+ jobQueue.getWaitingJobs();
Collection<JobInProgress> tempCollection = new ArrayList<JobInProgress>();
if(waitingJobs != null) {
tempCollection.addAll(waitingJobs);
Modified: hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Nov 20 09:59:10 2009
@@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import static org.apache.hadoop.mapred.CapacityTestUtils.*;
+import java.io.File;
import java.io.IOException;
import java.util.*;
@@ -36,6 +37,11 @@
static final Log LOG =
LogFactory.getLog(org.apache.hadoop.mapred.TestCapacityScheduler.class);
+ String queueConfigPath =
+ System.getProperty("test.build.extraconf", "build/test/extraconf");
+ File queueConfigFile =
+ new File(queueConfigPath, QueueManager.QUEUE_CONF_FILE_NAME);
+
private static int jobCounter;
private ControlledInitializationPoller controlledInitializationPoller;
@@ -341,65 +347,55 @@
taskTrackerManager.finishTask("attempt_test_0002_m_000003_0", j2);
}
- // basic tests, should be able to submit to queues
- public void testSubmitToQueues() throws Exception {
- // set up some queues
- String[] qs = {"default", "q2"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, true, 25));
-
+ /**
+ * tests the submission of jobs to container and job queues
+ * @throws Exception
+ */
+ public void testJobSubmission() throws Exception {
+ JobQueueInfo[] queues = TestQueueManagerRefresh.getSimpleQueueHierarchy();
- taskTrackerManager.setFakeQueues(queues);
+ queues[0].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(100));
+ queues[1].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+ queues[2].getProperties().setProperty(
+ CapacitySchedulerConf.CAPACITY_PROPERTY, String.valueOf(50));
+
+ // write the configuration file
+ QueueManagerTestUtils.writeQueueConfigurationFile(
+ queueConfigFile.getAbsolutePath(), new JobQueueInfo[] { queues[0] });
+ setUp(1, 4, 4);
+ // use the queues from the config file.
+ taskTrackerManager.setQueueManager(new QueueManager());
scheduler.start();
- // submit a job with no queue specified. It should be accepted
- // and given to the default queue.
- JobInProgress j = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
- 10, 10, null, "u1");
- // when we ask for tasks, we should get them for the job submitted
- Map<String, String> expectedTaskStrings = new HashMap<String, String>();
- expectedTaskStrings.put(CapacityTestUtils.MAP,
- "attempt_test_0001_m_000001_0 on tt1");
- expectedTaskStrings.put(CapacityTestUtils.REDUCE,
- "attempt_test_0001_r_000001_0 on tt1");
- checkMultipleTaskAssignment(taskTrackerManager, scheduler,
- "tt1", expectedTaskStrings);
-
- // submit another job, to a different queue
- j = taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // now when we get tasks, it should be from the second job
- expectedTaskStrings.clear();
- expectedTaskStrings.put(CapacityTestUtils.MAP,
- "attempt_test_0002_m_000001_0 on tt2");
- expectedTaskStrings.put(CapacityTestUtils.REDUCE,
- "attempt_test_0002_r_000001_0 on tt2");
- checkMultipleTaskAssignment(taskTrackerManager, scheduler,
- "tt2", expectedTaskStrings);
- }
-
- public void testGetJobs() throws Exception {
- // need only one queue
- String[] qs = {"default"};
- taskTrackerManager.addQueues(qs);
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 100));
-
+ // submit a job to the container queue
+ try {
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 20, 0,
+ queues[0].getQueueName(), "user");
+ fail("Jobs are being able to be submitted to the container queue");
+ } catch (Exception e) {
+ assertTrue(scheduler.getJobs(queues[0].getQueueName()).isEmpty());
+ }
+
+ FakeJobInProgress job = taskTrackerManager.submitJobAndInit(JobStatus.PREP,
+ 1, 0, queues[1].getQueueName(), "user");
+ assertEquals(1, scheduler.getJobs(queues[1].getQueueName()).size());
+ assertTrue(scheduler.getJobs(queues[1].getQueueName()).contains(job));
+
+ // check if the job is submitted
+ checkAssignment(taskTrackerManager, scheduler, "tt1",
+ "attempt_test_0002_m_000001_0 on tt1");
- taskTrackerManager.setFakeQueues(queues);
- scheduler.start();
+ // test for getJobs
HashMap<String, ArrayList<FakeJobInProgress>> subJobsList =
- taskTrackerManager.submitJobs(1, 4, "default");
+ taskTrackerManager.submitJobs(1, 4, queues[2].getQueueName());
JobQueuesManager mgr = scheduler.jobQueuesManager;
-
- while (mgr.getJobQueue("default").getWaitingJobs().size() < 4) {
- Thread.sleep(1);
- }
//Raise status change events for jobs submitted.
- raiseStatusChangeEvents(mgr);
- Collection<JobInProgress> jobs = scheduler.getJobs("default");
+ raiseStatusChangeEvents(mgr, queues[2].getQueueName());
+ Collection<JobInProgress> jobs =
+ scheduler.getJobs(queues[2].getQueueName());
assertTrue(
"Number of jobs returned by scheduler is wrong"
Modified: hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java?rev=882472&r1=882471&r2=882472&view=diff
==============================================================================
--- hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/mapreduce/branches/branch-0.21/src/java/org/apache/hadoop/mapred/JobTracker.java Fri Nov 20 09:59:10 2009
@@ -3889,7 +3889,10 @@
public org.apache.hadoop.mapreduce.JobStatus[] getJobsFromQueue(String queue)
throws IOException {
- Collection<JobInProgress> jips = taskScheduler.getJobs(queue);
+ Collection<JobInProgress> jips = null;
+ if (queueManager.getLeafQueueNames().contains(queue)) {
+ jips = taskScheduler.getJobs(queue);
+ }
return getJobStatus(jips,false);
}