You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2011/03/04 04:25:22 UTC
svn commit: r1076951 - in
/hadoop/common/branches/branch-0.20-security-patches: conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content...
Author: omalley
Date: Fri Mar 4 03:25:21 2011
New Revision: 1076951
URL: http://svn.apache.org/viewvc?rev=1076951&view=rev
Log:
commit 2a7c1121115819f918adb47c4aebd4daa693a0ea
Author: Lee Tucker <lt...@yahoo-inc.com>
Date: Thu Jul 30 17:40:40 2009 -0700
Applying patch 2855407.mr532.patch
Modified:
hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
Modified: hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template (original)
+++ hadoop/common/branches/branch-0.20-security-patches/conf/capacity-scheduler.xml.template Fri Mar 4 03:25:21 2011
@@ -45,6 +45,44 @@
of the job queue.
</description>
</property>
+
+<property>
+ <name>mapred.capacity-scheduler.queue.default.max.map.slots</name>
+ <value>-1</value>
+ <description>
+ This value is the maximum map slots that can be used in a
+ queue at any point of time. So for example assuming above config value
+ is 100 , not more than 100 tasks would be in the queue at any point of
+ time, assuming each task takes one slot.
+
+ Default value of -1 would disable this capping feature
+
+ Typically the queue capacity should be equal to this limit.
+ If queue capacity is more than this limit, excess capacity will be
+ used by the other queues. If queue capacity is less than the above
+ limit , then the limit would be the queue capacity - as in the current
+ implementation
+ </description>
+</property>
+
+<property>
+ <name>mapred.capacity-scheduler.queue.default.max.reduce.slots</name>
+ <value>-1</value>
+ <description>
+ This value is the maximum reduce slots that can be used in a
+ queue at any point of time. So for example assuming above config value
+ is 100 , not more than 100 reduce tasks would be in the queue at any point
+ of time, assuming each task takes one slot.
+
+ Default value of -1 would disable this capping feature
+
+ Typically the queue capacity should be equal to this limit.
+ If queue capacity is more than this limit, excess capacity will be
+ used by the other queues. If queue capacity is less than the above
+ limit , then the limit would be the queue capacity - as in the current
+ implementation
+ </description>
+</property>
<!-- The default configuration settings for the capacity task scheduler -->
<!-- The default values would be applied to all the queues which don't have -->
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Fri Mar 4 03:25:21 2011
@@ -76,6 +76,18 @@ class CapacitySchedulerConf {
"mapred.capacity-scheduler.task.limit.maxpmem";
/**
+ * Configuration that provides the maximum cap for the map task in a queue
+ * at any given point of time.
+ */
+ static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
+
+ /**
+ * Configuration that provides the maximum cap for the reduce task in a queue
+ * at any given point of time.
+ */
+ static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
+
+ /**
* The constant which defines the default initialization thread
* polling interval, denoted in milliseconds.
*/
@@ -357,4 +369,40 @@ class CapacitySchedulerConf {
rmConf.setInt(
"mapred.capacity-scheduler.init-worker-threads", poolSize);
}
+
+ /**
+ * get the max map slots cap
+ * @param queue
+ * @return
+ */
+ public int getMaxMapCap(String queue) {
+ return rmConf.getInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),-1);
+ }
+
+ /**
+ * Used for testing
+ * @param queue
+ * @param val
+ */
+ public void setMaxMapCap(String queue,int val) {
+ rmConf.setInt(toFullPropertyName(queue,MAX_MAP_CAP_PROPERTY),val);
+ }
+
+ /**
+ * get the max reduce slots cap
+ * @param queue
+ * @return
+ */
+ public int getMaxReduceCap(String queue) {
+ return rmConf.getInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),-1);
+ }
+
+ /**
+ * Used for testing
+ * @param queue
+ * @param val
+ */
+ public void setMaxReduceCap(String queue,int val) {
+ rmConf.setInt(toFullPropertyName(queue,MAX_REDUCE_CAP_PROPERTY),val);
+ }
}
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Fri Mar 4 03:25:21 2011
@@ -76,23 +76,36 @@ class CapacityTaskScheduler extends Task
**********************************************************************/
private static class TaskSchedulingInfo {
+
+ private static final String LIMIT_NORMALIZED_CAPACITY_STRING
+ = "(Capacity is restricted to max limit of %d slots.\n" +
+ "Remaining %d slots will be used by other queues.)\n";
/**
* the actual capacity, which depends on how many slots are available
* in the cluster at any given time.
*/
- int capacity = 0;
+ private int capacity = 0;
// number of running tasks
int numRunningTasks = 0;
// number of slots occupied by running tasks
int numSlotsOccupied = 0;
/**
+ * max task limit
+ * This value is the maximum slots that can be used in a
+ * queue at any point of time. So for example assuming above config value
+ * is 100 , not more than 100 tasks would be in the queue at any point of
+ * time, assuming each task takes one slot.
+ */
+ private int maxTaskLimit = -1;
+
+ /**
* for each user, we need to keep track of number of slots occupied by
* running tasks
*/
Map<String, Integer> numSlotsOccupiedByUser =
new HashMap<String, Integer>();
-
+
/**
* reset the variables associated with tasks
*/
@@ -104,15 +117,53 @@ class CapacityTaskScheduler extends Task
}
}
+
+ int getMaxTaskLimit() {
+ return maxTaskLimit;
+ }
+
+ void setMaxTaskLimit(int maxTaskCap) {
+ this.maxTaskLimit = maxTaskCap;
+ }
+
+ /**
+ * This method checks for maxTaskLimit and sends minimum of maxTaskLimit and
+ * capacity.
+ * @return
+ */
+ int getCapacity() {
+ return ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) ? maxTaskLimit :
+ capacity;
+ }
+
+ /**
+ * Mutator method for capacity
+ * @param capacity
+ */
+ void setCapacity(int capacity) {
+ this.capacity = capacity;
+ }
+
+
/**
* return information about the tasks
*/
@Override
public String toString() {
float occupiedSlotsAsPercent =
- capacity != 0 ? ((float) numSlotsOccupied * 100 / capacity) : 0;
+ getCapacity() != 0 ?
+ ((float) numSlotsOccupied * 100 / getCapacity()) : 0;
StringBuffer sb = new StringBuffer();
+
sb.append("Capacity: " + capacity + " slots\n");
+ //If maxTaskLimit is less than the capacity
+ if (maxTaskLimit >= 0 && maxTaskLimit < capacity) {
+ sb.append(String.format(LIMIT_NORMALIZED_CAPACITY_STRING,
+ maxTaskLimit, (capacity-maxTaskLimit)));
+ }
+ if (maxTaskLimit >= 0) {
+ sb.append(String.format("Maximum Slots Limit: %d\n", maxTaskLimit));
+ }
sb.append(String.format("Used capacity: %d (%.1f%% of Capacity)\n",
Integer.valueOf(numSlotsOccupied), Float
.valueOf(occupiedSlotsAsPercent)));
@@ -166,14 +217,17 @@ class CapacityTaskScheduler extends Task
TaskSchedulingInfo mapTSI;
TaskSchedulingInfo reduceTSI;
- public QueueSchedulingInfo(String queueName, float capacityPercent,
- int ulMin, JobQueuesManager jobQueuesManager) {
+ public QueueSchedulingInfo(String queueName, float capacityPercent,
+ int ulMin, JobQueuesManager jobQueuesManager,
+ int mapCap, int reduceCap) {
this.queueName = new String(queueName);
this.capacityPercent = capacityPercent;
this.ulMin = ulMin;
this.jobQueuesManager = jobQueuesManager;
this.mapTSI = new TaskSchedulingInfo();
this.reduceTSI = new TaskSchedulingInfo();
+ this.mapTSI.setMaxTaskLimit(mapCap);
+ this.reduceTSI.setMaxTaskLimit(reduceCap);
}
/**
@@ -194,7 +248,7 @@ class CapacityTaskScheduler extends Task
(jobQueuesManager.doesQueueSupportPriorities(queueName))?
"YES":"NO"));
sb.append("-------------\n");
-
+
sb.append("Map tasks\n");
sb.append(mapTSI.toString());
sb.append("-------------\n");
@@ -339,8 +393,9 @@ class CapacityTaskScheduler extends Task
* capacity. This ordered list is iterated over, when assigning tasks.
*/
private List<QueueSchedulingInfo> qsiForAssigningTasks =
- new ArrayList<QueueSchedulingInfo>();
- /**
+ new ArrayList<QueueSchedulingInfo>();
+
+ /**
* Comparator to sort queues.
* For maps, we need to sort on QueueSchedulingInfo.mapTSI. For
* reducers, we use reduceTSI. So we'll need separate comparators.
@@ -353,10 +408,10 @@ class CapacityTaskScheduler extends Task
TaskSchedulingInfo t2 = getTSI(q2);
// look at how much capacity they've filled. Treat a queue with
// capacity=0 equivalent to a queue running at capacity
- double r1 = (0 == t1.capacity)? 1.0f:
- (double)t1.numSlotsOccupied/(double)t1.capacity;
- double r2 = (0 == t2.capacity)? 1.0f:
- (double)t2.numSlotsOccupied/(double)t2.capacity;
+ double r1 = (0 == t1.getCapacity())? 1.0f:
+ (double)t1.numSlotsOccupied/(double) t1.getCapacity();
+ double r2 = (0 == t2.getCapacity())? 1.0f:
+ (double)t2.numSlotsOccupied/(double) t2.getCapacity();
if (r1<r2) return -1;
else if (r1>r2) return 1;
else return 0;
@@ -412,8 +467,8 @@ class CapacityTaskScheduler extends Task
// slots we're getting).
int currentCapacity;
TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.numSlotsOccupied < tsi.capacity) {
- currentCapacity = tsi.capacity;
+ if (tsi.numSlotsOccupied < tsi.getCapacity()) {
+ currentCapacity = tsi.getCapacity();
}
else {
currentCapacity = tsi.numSlotsOccupied + getSlotsPerTask(j);
@@ -597,7 +652,11 @@ class CapacityTaskScheduler extends Task
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
// we may have queues with capacity=0. We shouldn't look at jobs from
// these queues
- if (0 == getTSI(qsi).capacity) {
+ if (0 == getTSI(qsi).getCapacity()) {
+ continue;
+ }
+
+ if(this.areTasksInQueueOverLimit(qsi)) {
continue;
}
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -622,6 +681,31 @@ class CapacityTaskScheduler extends Task
return TaskLookupResult.getNoTaskFoundResult();
}
+
+ /**
+ * Check if the max task limit is set for this queue
+ * if set , ignore this qsi if current num of occupied
+ * slots of a TYPE in the queue is >= getMaxTaskCap().
+ * @param qsi
+ * @return
+ */
+
+ private boolean areTasksInQueueOverLimit(QueueSchedulingInfo qsi) {
+ TaskSchedulingInfo tsi = getTSI(qsi);
+ if (tsi.getMaxTaskLimit() >= 0) {
+ if (tsi.numSlotsOccupied >= tsi.getCapacity()) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug(
+ "Queue " + qsi.queueName + " has reached its max " + type +
+ " limit ");
+ LOG.debug("Current running tasks " + tsi.getCapacity());
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
// for debugging.
private void printQSIs() {
if (LOG.isDebugEnabled()) {
@@ -629,12 +713,16 @@ class CapacityTaskScheduler extends Task
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
TaskSchedulingInfo tsi = getTSI(qsi);
Collection<JobInProgress> runJobs =
- scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
- s.append(String.format(" Queue '%s'(%s): runningTasks=%d, "
- + "occupiedSlots=%d, capacity=%d, runJobs=%d", qsi.queueName,
+ scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
+ s.append(
+ String.format(
+ " Queue '%s'(%s): runningTasks=%d, "
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d maxTaskLimit=%d ",
+ qsi.queueName,
this.type, Integer.valueOf(tsi.numRunningTasks), Integer
- .valueOf(tsi.numSlotsOccupied), Integer
- .valueOf(tsi.capacity), Integer.valueOf(runJobs.size())));
+ .valueOf(tsi.numSlotsOccupied), Integer
+ .valueOf(tsi.getCapacity()), Integer.valueOf(runJobs.size()),
+ Integer.valueOf(tsi.getMaxTaskLimit())));
}
LOG.debug(s);
}
@@ -970,8 +1058,9 @@ class CapacityTaskScheduler extends Task
}
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
// create our QSI and add to our hashmap
- QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, capacity,
- ulMin, jobQueuesManager);
+ QueueSchedulingInfo qsi = new QueueSchedulingInfo(
+ queueName, capacity, ulMin, jobQueuesManager, schedConf.getMaxMapCap(
+ queueName), schedConf.getMaxReduceCap(queueName));
queueInfoMap.put(queueName, qsi);
// create the queues of job objects
@@ -1068,12 +1157,12 @@ class CapacityTaskScheduler extends Task
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
// compute new capacities, if TT slots have changed
if (mapClusterCapacity != prevMapClusterCapacity) {
- qsi.mapTSI.capacity =
- (int)(qsi.capacityPercent*mapClusterCapacity/100);
+ qsi.mapTSI.setCapacity((int)
+ (qsi.capacityPercent*mapClusterCapacity/100));
}
if (reduceClusterCapacity != prevReduceClusterCapacity) {
- qsi.reduceTSI.capacity =
- (int)(qsi.capacityPercent*reduceClusterCapacity/100);
+ qsi.reduceTSI.setCapacity((int)
+ (qsi.capacityPercent*reduceClusterCapacity/100));
}
// reset running/pending tasks, tasks per user
qsi.mapTSI.resetTaskVars();
Modified: hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Fri Mar 4 03:25:21 2011
@@ -862,6 +862,86 @@ public class TestCapacityScheduler exten
1, rqueue.size());
}
+
+ /**
+ * Test the max map limit.
+ * @throws IOException
+ */
+ public void testMaxMapCap() throws IOException {
+ this.setUp(4,1,1);
+ taskTrackerManager.addQueues(new String[] {"default"});
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxMapCap("default",2);
+ resConf.setMaxReduceCap("default",-1);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ //submit the Job
+ FakeJobInProgress fjob1 =
+ submitJobAndInit(JobStatus.PREP,3,1,"default","user");
+
+ List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+ List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+
+ //Once the 2 tasks are running the third assigment should be reduce.
+ checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+ //This should fail.
+ List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
+ assertNull(task4);
+ //Now complete the task 1.
+ // complete the job
+ taskTrackerManager.finishTask("tt1", task1.get(0).getTaskID().toString(),
+ fjob1);
+ //We have completed the tt1 task which was a map task so we expect one map
+ //task to be picked up
+ checkAssignment("tt4","attempt_test_0001_m_000003_0 on tt4");
+ }
+
+ /**
+ * Test max reduce limit
+ * @throws IOException
+ */
+ public void testMaxReduceCap() throws IOException {
+ this.setUp(4, 1, 1);
+ taskTrackerManager.addQueues(new String[]{"default"});
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxMapCap("default", -1);
+ resConf.setMaxReduceCap("default", 2);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ //submit the Job
+ FakeJobInProgress fjob1 =
+ submitJobAndInit(JobStatus.PREP, 1, 3, "default", "user");
+
+ List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
+ List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+ List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
+
+ //This should fail. 1 map, 2 reduces , we have reached the limit.
+ List<Task> task4 = scheduler.assignTasks(tracker("tt4"));
+ assertNull(task4);
+ //Now complete the task 1 i.e map task.
+ // complete the job
+ taskTrackerManager.finishTask(
+ "tt1", task1.get(0).getTaskID().toString(),
+ fjob1);
+
+ //This should still fail as only map task is done
+ task4 = scheduler.assignTasks(tracker("tt4"));
+ assertNull(task4);
+
+ //Complete the reduce task
+ taskTrackerManager.finishTask(
+ "tt2", task2.get(0).getTaskID().toString(), fjob1);
+
+ //One reduce is done hence assign the new reduce.
+ checkAssignment("tt4","attempt_test_0001_r_000003_0 on tt4");
+ }
// test if the queue reflects the changes
private void testJobOrderChange(FakeJobInProgress fjob1,
@@ -1132,6 +1212,144 @@ public class TestCapacityScheduler exten
checkAssignment("tt2", "attempt_test_0001_m_000004_0 on tt2");
}
+ /**
+ * Creates a queue with max task limit of 2
+ * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
+ * given to high ram job and are reserved , no other tasks are accepted .
+ *
+ * @throws IOException
+ */
+ public void testHighMemoryBlockingWithMaxLimit()
+ throws IOException {
+
+ // 2 map and 1 reduce slots
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+
+ taskTrackerManager.addQueues(new String[] { "defaultXYZ" });
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("defaultXYZ", 100.0f, true, 25));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxMapCap("defaultXYZ",2);
+ scheduler.setTaskTrackerManager(taskTrackerManager);
+ // enabled memory-based scheduling
+ // Normal job in the cluster would be 1GB maps/reduces
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_MAP_MEMORY_MB_PROPERTY,
+ 2 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAP_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_MAX_REDUCE_MEMORY_MB_PROPERTY,
+ 1 * 1024);
+ scheduler.getConf().setLong(
+ JobTracker.MAPRED_CLUSTER_REDUCE_MEMORY_MB_PROPERTY, 1 * 1024);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // The situation : Submit 2 jobs with high memory map task
+ //Set the max limit for queue to 2 ,
+ // try submitting more map tasks to the queue , it should not happen
+
+ LOG.debug("Submit one high memory(2GB maps, 0MB reduces) job of "
+ + "2 map tasks");
+ JobConf jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(2 * 1024);
+ jConf.setMemoryForReduceTask(0);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(0);
+ jConf.setQueueName("defaultXYZ");
+ jConf.setUser("u1");
+ FakeJobInProgress job1 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ LOG.debug("Submit another regular memory(1GB vmem maps/reduces) job of "
+ + "2 map/red tasks");
+ jConf = new JobConf(conf);
+ jConf.setMemoryForMapTask(1 * 1024);
+ jConf.setMemoryForReduceTask(1 * 1024);
+ jConf.setNumMapTasks(2);
+ jConf.setNumReduceTasks(2);
+ jConf.setQueueName("defaultXYZ");
+ jConf.setUser("u1");
+ FakeJobInProgress job2 = submitJobAndInit(JobStatus.PREP, jConf);
+
+ // first, a map from j1 will run this is a high memory job so it would
+ // occupy the 2 slots
+ checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 0L);
+
+ // at this point, the scheduler tries to schedule another map from j1.
+ // there isn't enough space. The second job's reduce should be scheduled.
+ checkAssignment("tt1", "attempt_test_0002_r_000001_0 on tt1");
+
+ checkOccupiedSlots("defaultXYZ", TaskType.MAP, 1,2, 100.0f,3,1);
+ checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
+
+ //at this point , the scheduler tries to schedule another map from j2 for
+ //another task tracker.
+ // This should not happen as all the map slots are taken
+ //by the first task itself.hence reduce task from the second job is given
+
+ checkAssignment("tt2","attempt_test_0002_r_000002_0 on tt2");
+ }
+
+ /**
+ * test if user limits automatically adjust to max map or reduce limit
+ */
+ public void testUserLimitsWithMaxLimits() throws Exception {
+ setUp(4, 4, 4);
+ // set up some queues
+ String[] qs = {"default"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ resConf.setFakeQueues(queues);
+ resConf.setMaxMapCap("default", 2);
+ resConf.setMaxReduceCap("default", 2);
+ scheduler.setResourceManagerConf(resConf);
+ scheduler.start();
+
+ // submit a job
+ FakeJobInProgress fjob1 =
+ submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
+ FakeJobInProgress fjob2 =
+ submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
+
+ // for queue 'default', the capacity for maps is 2.
+ // But the max map limit is 2
+ // hence user should be getting not more than 1 as it is the 50%.
+ Task t1 = checkAssignment("tt1", "attempt_test_0001_m_000001_0 on tt1");
+
+ //Now we should get the task from the other job. As the
+ //first user has reached his max map limit.
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+
+ //Now we are done with map limit , now if we ask for task we should
+ // get reduce from 1st job
+ checkAssignment("tt3", "attempt_test_0001_r_000001_0 on tt3");
+ // Now we're at full capacity for maps. 1 done with reduces for job 1 so
+ // now we should get 1 reduces for job 2
+ Task t4 = checkAssignment("tt4", "attempt_test_0002_r_000001_0 on tt4");
+
+ taskTrackerManager.finishTask(
+ "tt1", t1.getTaskID().toString(),
+ fjob1);
+
+ //tt1 completed the task so we have 1 map slot for u1
+ // we are assigning the 2nd map task from fjob1
+ checkAssignment("tt1", "attempt_test_0001_m_000002_0 on tt1");
+
+ taskTrackerManager.finishTask(
+ "tt4", t4.getTaskID().toString(),
+ fjob2);
+ //tt4 completed the task , so we have 1 reduce slot for u2
+ //we are assigning the 2nd reduce from fjob2
+ checkAssignment("tt4", "attempt_test_0002_r_000002_0 on tt4");
+
+ }
+
+
// test user limits
public void testUserLimits() throws Exception {
// set up some queues
@@ -2707,17 +2925,25 @@ public class TestCapacityScheduler exten
/**
* Verify the number of slots of type 'type' from the queue 'queue'.
+ * incrMapIndex and incrReduceIndex are set , when expected output string is
+ * changed.these values can be set if the index of
+ * "Used capacity: %d (%.1f%% of Capacity)"
+ * is changed.
*
* @param queue
* @param type
* @param numActiveUsers in the queue at present.
* @param expectedOccupiedSlots
* @param expectedOccupiedSlotsPercent
- * @return
+ * @param incrMapIndex
+ * @param incrReduceIndex
*/
- private void checkOccupiedSlots(String queue,
- TaskType type, int numActiveUsers,
- int expectedOccupiedSlots, float expectedOccupiedSlotsPercent) {
+ private void checkOccupiedSlots(
+ String queue,
+ TaskType type, int numActiveUsers,
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,int incrMapIndex
+ ,int incrReduceIndex
+ ) {
scheduler.updateQSIInfoForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
String schedulingInfo =
@@ -2725,9 +2951,9 @@ public class TestCapacityScheduler exten
String[] infoStrings = schedulingInfo.split("\n");
int index = -1;
if (type.equals(TaskType.MAP)) {
- index = 7;
+ index = 7+ incrMapIndex;
} else if (type.equals(TaskType.REDUCE)) {
- index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers);
+ index = (numActiveUsers == 0 ? 12 : 13 + numActiveUsers)+incrReduceIndex;
}
LOG.info(infoStrings[index]);
assertEquals(String.format("Used capacity: %d (%.1f%% of Capacity)",
@@ -2735,6 +2961,24 @@ public class TestCapacityScheduler exten
infoStrings[index]);
}
+ /**
+ *
+ * @param queue
+ * @param type
+ * @param numActiveUsers
+ * @param expectedOccupiedSlots
+ * @param expectedOccupiedSlotsPercent
+ */
+ private void checkOccupiedSlots(
+ String queue,
+ TaskType type, int numActiveUsers,
+ int expectedOccupiedSlots, float expectedOccupiedSlotsPercent
+ ) {
+ checkOccupiedSlots(
+ queue, type, numActiveUsers, expectedOccupiedSlots,
+ expectedOccupiedSlotsPercent,0,0);
+ }
+
private void checkQueuesOrder(String[] expectedOrder, String[] observedOrder) {
assertTrue("Observed and expected queues are not of same length.",
expectedOrder.length == observedOrder.length);
Modified: hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=1076951&r1=1076950&r2=1076951&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security-patches/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Fri Mar 4 03:25:21 2011
@@ -204,6 +204,38 @@
users, no user can use more than 25% of the queue's resources. A
value of 100 implies no user limits are imposed.</td>
</tr>
+ <tr><td>mapred.capacity-scheduler.queue.<queue-name>.max.map.slots</td>
+ <td>
+ This value is the maximum max slots that can be used in a
+ queue at any point of time. So for example assuming above config value
+ is 100 , not more than 100 tasks would be in the queue at any point of
+ time, assuming each task takes one slot.
+
+ Default value of -1 would disable this capping feature
+
+ Typically the queue capacity should be equal to this limit.
+ If queue capacity is more than this limit, excess capacity will be
+ used by the other queues. If queue capacity is less than the above
+ limit , then the limit would be the queue capacity - as in the current
+ implementation
+ </td>
+ </tr>
+ <tr><td>mapred.capacity-scheduler.queue.<queue-name>.max.reduce.slots</td>
+ <td>
+ This value is the maximum reduce slots that can be used in a
+ queue at any point of time. So for example assuming above config value
+ is 100 , not more than 100 tasks would be in the queue at any point of
+ time, assuming each task takes one slot.
+
+ Default value of -1 would disable this capping feature
+
+ Typically the queue capacity should be equal to this limit.
+ If queue capacity is more than this limit, excess capacity will be
+ used by the other queues. If queue capacity is less than the above
+ limit , then the limit would be the queue capacity - as in the current
+ implementation
+ </td>
+ </tr>
</table>
</section>