You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2008/12/04 14:58:23 UTC
svn commit: r723326 - in /hadoop/core/trunk: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: yhemanth
Date: Thu Dec 4 05:58:22 2008
New Revision: 723326
URL: http://svn.apache.org/viewvc?rev=723326&view=rev
Log:
HADOOP-4558. Fix capacity reclamation in capacity scheduler. Contributed by Amar Kamat.
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Dec 4 05:58:22 2008
@@ -256,6 +256,9 @@
HADOOP-4732. Pass connection and read timeouts in the correct order when
setting up fetch in reduce. (Amareshwari Sriramadasu via cdouglas)
+ HADOOP-4558. Fix capacity reclamation in capacity scheduler.
+ (Amar Kamat via yhemanth)
+
Release 0.19.1 - Unreleased
IMPROVEMENTS
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Thu Dec 4 05:58:22 2008
@@ -378,6 +378,11 @@
if (queueInfoMap.size() < 2) {
return;
}
+
+ // make sure we always get the latest values
+ updateQSIObjects();
+ updateCollectionOfQSIs();
+
QueueSchedulingInfo lastQsi =
qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
long currentTime = scheduler.clock.getTime();
@@ -554,7 +559,7 @@
qsi.numRunningTasksByUser.put(j.getProfile().getUser(),
i+getRunningTasks(j));
qsi.numPendingTasks += getPendingTasks(j);
- LOG.debug("updateQSI: job " + j.toString() + ": run(m) = " +
+ LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
j.runningMaps() + ", run(r) = " + j.runningReduces() +
", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
@@ -1134,7 +1139,7 @@
totalCapacity += gc;
}
int ulMin = rmConf.getMinimumUserLimitPercent(queueName);
- long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName);
+ long reclaimTimeLimit = rmConf.getReclaimTimeLimit(queueName) * 1000;
// reclaimTimeLimit is the time(in millisec) within which we need to
// reclaim capacity.
// create queue scheduling objects for Map and Reduce
Modified: hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=723326&r1=723325&r2=723326&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Thu Dec 4 05:58:22 2008
@@ -1168,9 +1168,9 @@
taskTrackerManager.addQueues(qs);
resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q2", 25.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q3", 25.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 25.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q3", 25.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1185,8 +1185,6 @@
checkAssignment("tt2", "attempt_test_0001_m_000003_0 on tt2");
// now submit a job to q2
FakeJobInProgress j3 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u1");
- // update our structures
- scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
scheduler.reclaimCapacity();
// our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
@@ -1211,10 +1209,10 @@
taskTrackerManager.addQueues(qs);
resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q2", 20.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q3", 20.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q4", 10.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 20.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q3", 20.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q4", 10.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1242,8 +1240,6 @@
// running 3 tasks (with a cap of 1).
// If we submit a job to 'default', we need to get 3 slots back.
FakeJobInProgress j4 = submitJobAndInit(JobStatus.PREP, 10, 10, null, "u1");
- // update our structures
- scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
scheduler.reclaimCapacity();
// our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
@@ -1262,6 +1258,42 @@
}
+ // test code to reclaim capacity when the cluster is completely occupied
+ public void testReclaimCapacityWithFullCluster() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "queue"};
+ taskTrackerManager.addQueues(qs);
+ int maxSlots = taskTrackerManager.maxMapTasksPerTracker
+ * taskTrackerManager.taskTrackers().size();
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("queue", 50.0f, 1000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // now submit 1 job to queue "default" which should take up the cluster
+ FakeJobInProgress j1 =
+ submitJobAndInit(JobStatus.PREP, maxSlots, 0, "default", "u1");
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000002_0 on tt2");
+ checkAssignment("tt1", "attempt_test_0001_m_000003_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
+
+ // now submit a job to queue "queue"
+ submitJobAndInit(JobStatus.PREP, maxSlots, 0, "queue", "u2");
+
+ scheduler.reclaimCapacity();
+
+ clock.advance(scheduler.rmConf.getReclaimTimeLimit("default") * 1000);
+
+ scheduler.reclaimCapacity();
+
+ // check if the tasks are killed
+ assertEquals("Failed to reclaim tasks", j1.runningMapTasks, 2);
+ }
+
// test code to reclaim capacity in steps
public void testReclaimCapacityInSteps() throws Exception {
// set up some queues
@@ -1269,8 +1301,8 @@
taskTrackerManager.addQueues(qs);
resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();
@@ -1284,8 +1316,6 @@
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
// now submit a job to q2
FakeJobInProgress j2 = submitJobAndInit(JobStatus.PREP, 1, 1, "q2", "u1");
- // update our structures
- scheduler.updateQSIInfo();
// get scheduler to notice that q2 needs to reclaim
scheduler.reclaimCapacity();
// our queue reclaim time is 1000s, heartbeat interval is 5 sec, so
@@ -1293,8 +1323,6 @@
clock.advance(400000);
// submit another job to q2 which causes more capacity to be reclaimed
j2 = submitJobAndInit(JobStatus.PREP, 10, 10, "q2", "u2");
- // update our structures
- scheduler.updateQSIInfo();
clock.advance(200000);
scheduler.reclaimCapacity();
// one task from j1 will be killed
@@ -1315,8 +1343,8 @@
taskTrackerManager.addQueues(qs);
resConf = new FakeResourceManagerConf();
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 50.0f, 1000000, true, 25));
- queues.add(new FakeQueueInfo("q2", 50.0f, 1000000, true, 25));
+ queues.add(new FakeQueueInfo("default", 50.0f, 1000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 50.0f, 1000, true, 25));
resConf.setFakeQueues(queues);
scheduler.setResourceManagerConf(resConf);
scheduler.start();