You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/19 04:49:01 UTC
svn commit: r727910 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: yhemanth
Date: Thu Dec 18 19:49:01 2008
New Revision: 727910
URL: http://svn.apache.org/viewvc?rev=727910&view=rev
Log:
HADOOP-4876. Fix capacity scheduler reclamation by updating count of pending tasks correctly. Contributed by Sreekanth Ramakrishnan
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 18 19:49:01 2008
@@ -475,6 +475,9 @@
items which would otherwise not work on a fresh checkout.
(Sreekanth Ramakrishnan via yhemanth)
+ HADOOP-4876. Fix capacity scheduler reclamation by updating count of
+ pending tasks correctly. (Sreekanth Ramakrishnan via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec 18 19:49:01 2008
@@ -553,6 +553,10 @@
* to make scheduling decisions. For example, we don't need an exact count
* of numRunningTasks. Once we count upto the grid capacity (gcSum), any
* number beyond that will make no difference.
+ *
+ * The pending task count is only required in reclaim capacity. So
+ * if the computation becomes expensive, we can add a boolean to
+ * denote if pending task computation is required or not.
* */
private synchronized void updateQSIObjects() {
// if # of slots have changed since last time, update.
@@ -597,7 +601,29 @@
* consider the first few jobs per user.
*/
}
- //TODO do we need to update stats on waiting jobs
+
+ //update stats on waiting jobs
+ for(JobInProgress j :
+ scheduler.jobQueuesManager.getJobs(qsi.queueName)) {
+ // pending tasks
+ if(qsi.numPendingTasks > getClusterCapacity()) {
+ // that's plenty. no need for more computation
+ break;
+ }
+ /*
+ * Consider only the waiting jobs in the job queue. Job queue can
+ * contain:
+ * 1. Jobs which are in running state but not scheduled
+ * (these would also be present in running queue), the pending
+ * task count of these jobs is computed when scheduler walks
+ * through running job queue.
+ * 2. Jobs which are killed by user, but waiting job initialization
+ * poller to walk through the job queue to clean up killed jobs.
+ */
+ if (j.getStatus().getRunState() == JobStatus.PREP) {
+ qsi.numPendingTasks += getPendingTasks(j);
+ }
+ }
}
}
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=727910&r1=727909&r2=727910&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec 18 19:49:01 2008
@@ -1339,6 +1339,68 @@
assertEquals(j1.runningMapTasks, 2);
}
+
+ /*
+ * Test case for checking the reclaim capacity with uninitalized jobs.
+ *
+ * Configure 2 queue with capacity scheduler.
+ *
+ * Submit a single job to the default queue and make it go above the gc
+ * of the queue.
+ *
+ * Then submit another job to the second queue but don't initialize it.
+ *
+ * Run reclaim capacity thread for the scheduler, in order to let scheduler
+ * know that it has to reclaim capacity.
+ *
+ * Advance the scheduler clock by appropriate milliseconds.
+ *
+ * Run scheduler.reclaimCapacity() to kill the appropriate tasks.
+ *
+ * Check running task count of the running job.
+ *
+ */
+ public void testReclaimCapacityWithUninitializedJobs() throws IOException {
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
+ resConf.setFakeQueues(queues);
+
+ ControlledInitializationPoller p = new ControlledInitializationPoller(
+ scheduler.jobQueuesManager,
+ resConf,
+ resConf.getQueues());
+ scheduler.setInitializationPoller(p);
+
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+ //Submit one job to the default queue and get the capacity over the
+ //gc of the particular queue.
+ FakeJobInProgress j1 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+
+ //Submit another job to the second queue but not initialize it.
+ submitJob(JobStatus.PREP, 10, 10, "q2", "u1");
+
+ //call scheduler's reclaim capacity in order to start reclaim capacity
+ //process.
+ scheduler.reclaimCapacity();
+ //advance the clock to the position when the two task of the job would
+ //be killed.
+ clock.advance(600000);
+ //run reclaim capacity
+ scheduler.reclaimCapacity();
+ //check the count of the running tasks.
+ assertEquals(j1.runningMapTasks, 2);
+
+ }
+
/*
* Following is the testing strategy for testing scheduling information.
* - start capacity scheduler with two queues.