You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2008/10/15 22:17:46 UTC
svn commit: r705020 - in /hadoop/core/branches/branch-0.19: CHANGES.txt
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Author: acmurthy
Date: Wed Oct 15 13:17:45 2008
New Revision: 705020
URL: http://svn.apache.org/viewvc?rev=705020&view=rev
Log:
Merge -r 705010:705011 from trunk to branch-0.19 to fix HADOOP-4373.
Modified:
hadoop/core/branches/branch-0.19/CHANGES.txt
hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
Modified: hadoop/core/branches/branch-0.19/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/CHANGES.txt?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.19/CHANGES.txt Wed Oct 15 13:17:45 2008
@@ -870,6 +870,9 @@
HADOOP-4236. Ensure un-initialized jobs are killed correctly on
user-demand. (Sharad Agarwal via acmurthy)
+ HADOOP-4373. Fix calculation of Guaranteed Capacity for the
+ capacity-scheduler. (Hemanth Yamijala via acmurthy)
+
Release 0.18.2 - Unreleased
BUG FIXES
Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 15 13:17:45 2008
@@ -282,12 +282,26 @@
return -1;
}
else if ((0 == q1.reclaimList.size()) && (0 == q2.reclaimList.size())){
- // neither needs to reclaim. look at how much capacity they've filled
- double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
- double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
- if (r1<r2) return -1;
- else if (r1>r2) return 1;
- else return 0;
+ // neither needs to reclaim. If either doesn't have a capacity yet,
+ // it comes at the end of the queue.
+ if ((q1.guaranteedCapacity == 0) &&
+ (q2.guaranteedCapacity != 0)) {
+ return 1;
+ } else if ((q1.guaranteedCapacity != 0) &&
+ (q2.guaranteedCapacity == 0)) {
+ return -1;
+ } else if ((q1.guaranteedCapacity == 0) &&
+ (q2.guaranteedCapacity == 0)) {
+ // both don't have capacities, treat them as equal.
+ return 0;
+ } else {
+ // look at how much capacity they've filled
+ double r1 = (double)q1.numRunningTasks/(double)q1.guaranteedCapacity;
+ double r2 = (double)q2.numRunningTasks/(double)q2.guaranteedCapacity;
+ if (r1<r2) return -1;
+ else if (r1>r2) return 1;
+ else return 0;
+ }
}
else {
// both have to reclaim. Look at which one needs to reclaim earlier
@@ -335,6 +349,10 @@
qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
long currentTime = scheduler.clock.getTime();
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
+ if (qsi.guaranteedCapacity <= 0) {
+ // no capacity, hence nothing can be reclaimed.
+ continue;
+ }
// is there any resource that needs to be reclaimed?
if ((!qsi.reclaimList.isEmpty()) &&
(qsi.reclaimList.getFirst().whenToKill <
@@ -484,8 +502,8 @@
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
// compute new GCs and ACs, if TT slots have changed
if (slotsDiff != 0) {
- qsi.guaranteedCapacity +=
- (qsi.guaranteedCapacityPercent*slotsDiff/100);
+ qsi.guaranteedCapacity =
+ (int)(qsi.guaranteedCapacityPercent*numSlots/100);
}
qsi.numRunningTasks = 0;
qsi.numPendingTasks = 0;
@@ -729,6 +747,13 @@
*/
updateCollectionOfQSIs();
for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
+ if (qsi.guaranteedCapacity <= 0.0f) {
+ // No capacity is guaranteed yet for this queue.
+ // Queues are sorted so that ones without capacities
+ // come towards the end. Hence, we can simply return
+ // from here without considering any further queues.
+ return null;
+ }
t = getTaskFromQueue(taskTracker, qsi);
if (t!= null) {
// we have a task. Update reclaimed resource info
Modified: hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=705020&r1=705019&r2=705020&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/core/branches/branch-0.19/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 15 13:17:45 2008
@@ -539,6 +539,62 @@
assertEquals(1, scheduler.jobQueuesManager.
getWaitingJobQueue("default").size());
}
+
+ // Tests how GC is computed and assignment of tasks done
+ // on the basis of the GC.
+ public void testCapacityBasedAllocation() throws Exception {
+ // set up some queues
+ String[] qs = {"default", "q2"};
+ taskTrackerManager.addQueues(qs);
+ resConf = new FakeResourceManagerConf();
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ // set the gc % as 10%, so that gc will be zero initially as
+ // the cluster capacity increase slowly.
+ queues.add(new FakeQueueInfo("default", 10.0f, 5000, true, 25));
+ queues.add(new FakeQueueInfo("q2", 90.0f, 5000, true, 25));
+ resConf.setFakeQueues(queues);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job to the default queue
+ submitJob(JobStatus.PREP, 10, 0, "default", "u1");
+
+ // submit a job to the second queue
+ submitJob(JobStatus.PREP, 10, 0, "q2", "u1");
+
+ // job from q2 runs first because it has some non-zero capacity.
+ checkAssignment("tt1", "attempt_test_0002_m_000001_0 on tt1");
+ verifyGuaranteedCapacity("0", "default");
+ verifyGuaranteedCapacity("3", "q2");
+
+ // add another tt to increase tt slots
+ taskTrackerManager.addTaskTracker("tt3");
+ checkAssignment("tt2", "attempt_test_0002_m_000002_0 on tt2");
+ verifyGuaranteedCapacity("0", "default");
+ verifyGuaranteedCapacity("5", "q2");
+
+ // add another tt to increase tt slots
+ taskTrackerManager.addTaskTracker("tt4");
+ checkAssignment("tt3", "attempt_test_0002_m_000003_0 on tt3");
+ verifyGuaranteedCapacity("0", "default");
+ verifyGuaranteedCapacity("7", "q2");
+
+ // add another tt to increase tt slots
+ taskTrackerManager.addTaskTracker("tt5");
+ // now job from default should run, as it is furthest away
+ // in terms of runningMaps / gc.
+ checkAssignment("tt4", "attempt_test_0001_m_000001_0 on tt4");
+ verifyGuaranteedCapacity("1", "default");
+ verifyGuaranteedCapacity("9", "q2");
+ }
+
+ private void verifyGuaranteedCapacity(String expectedCapacity,
+ String queue) throws IOException {
+ String schedInfo = taskTrackerManager.getQueueManager().
+ getSchedulerInfo(queue).toString();
+ assertTrue(schedInfo.contains("Current Capacity Maps : "
+ + expectedCapacity));
+ }
// test capacity transfer
public void testCapacityTransfer() throws Exception {