You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by yh...@apache.org on 2009/10/21 18:19:33 UTC
svn commit: r828081 - in /hadoop/mapreduce/trunk: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content/xdocs/
Author: yhemanth
Date: Wed Oct 21 16:19:32 2009
New Revision: 828081
URL: http://svn.apache.org/viewvc?rev=828081&view=rev
Log:
MAPREDUCE-1105. Remove max limit configuration in capacity scheduler in favor of max capacity percentage thus allowing the limit to go over queue capacity. Contributed by Rahul Kumar Singh.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Wed Oct 21 16:19:32 2009
@@ -802,3 +802,7 @@
MAPREDUCE-1086. Setup Hadoop logging environment for tasks to point to
task related parameters. (Ravi Gummadi via yhemanth)
+ MAPREDUCE-1105. Remove max limit configuration in capacity scheduler in
+ favor of max capacity percentage thus allowing the limit to go over
+ queue capacity. (Rahul Kumar Singh via yhemanth)
+
Modified: hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template (original)
+++ hadoop/mapreduce/trunk/conf/capacity-scheduler.xml.template Wed Oct 21 16:19:32 2009
@@ -69,44 +69,6 @@
</description>
</property>
-<property>
- <name>mapred.capacity-scheduler.queue.default.max.map.slots</name>
- <value>-1</value>
- <description>
- This value is the maximum map slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 tasks would be in the queue at any point of
- time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </description>
-</property>
-
-<property>
- <name>mapred.capacity-scheduler.queue.default.max.reduce.slots</name>
- <value>-1</value>
- <description>
- This value is the maximum reduce slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 reduce tasks would be in the queue at any point
- of time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </description>
-</property>
-
<!-- The default configuration settings for the capacity task scheduler -->
<!-- The default values would be applied to all the queues which don't have -->
<!-- the appropriate property for the particular queue -->
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/AbstractQueue.java Wed Oct 21 16:19:32 2009
@@ -241,15 +241,8 @@
sourceContext.setNumJobsByUser(qsc.getNumJobsByUser());
sourceContext.setNumOfWaitingJobs(qsc.getNumOfWaitingJobs());
- // Task limits are already read from the configuration. Cache them and set
- // them in the old hierarchy along with the map/reduce TSCs.
- int maxMapTaskLimit = sourceContext.getMapTSC().getMaxTaskLimit();
- int maxReduceTaskLimit = sourceContext.getReduceTSC().getMaxTaskLimit();
sourceContext.setMapTSC(qsc.getMapTSC());
sourceContext.setReduceTSC(qsc.getReduceTSC());
- sourceContext.getMapTSC().setMaxTaskLimit(maxMapTaskLimit);
- sourceContext.getReduceTSC().setMaxTaskLimit(maxReduceTaskLimit);
-
setQueueSchedulingContext(sourceContext);
if (LOG.isDebugEnabled()) {
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Wed Oct 21 16:19:32 2009
@@ -106,17 +106,6 @@
* cannot use the capacity of its parent queue.
*/
static final String MAX_CAPACITY_PROPERTY ="maximum-capacity";
- /**
- * Configuration that provides the maximum cap for the map task in a queue
- * at any given point of time.
- */
- static final String MAX_MAP_CAP_PROPERTY = "max.map.slots";
-
- /**
- * Configuration that provides the maximum cap for the reduce task in a queue
- * at any given point of time.
- */
- static final String MAX_REDUCE_CAP_PROPERTY = "max.reduce.slots";
/**
* The constant which defines the default initialization thread
@@ -223,7 +212,8 @@
}
/**
- * Get maximum percentage stretch for a queue.
+ * Return the maximum percentage of the cluster capacity that can be
+ * used by the given queue
* This percentage defines a limit beyond which a
* sub-queue cannot use the capacity of its parent queue.
* This provides a means to limit how much excess capacity a
@@ -232,8 +222,8 @@
* The maximum-capacity-stretch of a queue can only be
* greater than or equal to its minimum capacity.
*
- * @param queue
- * @return
+ * @param queue name of the queue
+ * @return maximum capacity percent of cluster for the queue
*/
public float getMaxCapacity(String queue) {
String raw = getProperty(queue, MAX_CAPACITY_PROPERTY);
@@ -367,28 +357,6 @@
return maxWorkerThreads;
}
- /**
- * get the max map slots cap
- * @param queue
- * @return
- */
- public int getMaxMapCap(String queue) {
- String raw = getProperty(queue, MAX_MAP_CAP_PROPERTY);
- return getInt(raw,-1);
- }
-
-
- /**
- * get the max reduce slots cap
- * @param queue
- * @return
- */
- public int getMaxReduceCap(String queue) {
- String raw = getProperty(queue, MAX_REDUCE_CAP_PROPERTY);
- return getInt(raw,-1);
- }
-
-
public Configuration getCSConf() {
return rmConf;
}
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Wed Oct 21 16:19:32 2009
@@ -20,7 +20,6 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
@@ -288,6 +287,12 @@
// only look at jobs that can be run. We ignore jobs that haven't
// initialized, or have completed but haven't been removed from the
// running queue.
+
+ //Check queue for maximum capacity .
+ if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+ continue;
+ }
+
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
@@ -357,6 +362,13 @@
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
}
+
+ //Check for the maximum-capacity.
+ if(areTasksInQueueOverMaxCapacity(qsi,j.getNumSlotsPerTask(type))) {
+ continue;
+ }
+
+
if (scheduler.memoryMatcher.matchesMemoryRequirements(j, type,
taskTrackerStatus)) {
// We found a suitable job. Get task from it.
@@ -442,9 +454,12 @@
continue;
}
- if(this.areTasksInQueueOverLimit(qsc)) {
+ //This call is important for optimization purposes , if we
+ //have reached the limit already no need for traversing the queue.
+ if(this.areTasksInQueueOverMaxCapacity(qsc,1)) {
continue;
}
+
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsc);
TaskLookupResult.LookUpStatus lookUpStatus = tlr.getLookUpStatus();
@@ -469,37 +484,30 @@
/**
- * Check if the max task limit is set for this queue
- * if set , ignore this qsc if current num of occupied
- * slots of a TYPE in the queue is >= getMaxTaskCap() or
- * if they have reached there Max Capacity.
+ * Check if maximum-capacity is set for this queue.
+ * If set and greater than 0 ,
+ * check if numofslotsoccupied+numSlotsPerTask is greater than
+ * maximum-Capacity ,if yes , implies this queue is over limit.
+ *
+ * Incase noOfSlotsOccupied is less than maximum-capacity ,but ,
+ * numOfSlotsOccupied+noSlotsPerTask is more than maximum-capacity we still
+ * dont assign the task . This may lead to under utilization of very small
+ * set of slots. But this is ok ,as we strictly respect the maximum-capacity
* @param qsc
- * @return
+ * @param noOfSlotsPerTask
+ * @return true if queue is over maximum-capacity
*/
-
- private boolean areTasksInQueueOverLimit(
- QueueSchedulingContext qsc) {
+ private boolean areTasksInQueueOverMaxCapacity(
+ QueueSchedulingContext qsc,int noOfSlotsPerTask) {
TaskSchedulingContext tsi = getTSC(qsc);
- //check for maxTaskLimit
-
- if (tsi.getMaxTaskLimit() >= 0) {
- if (tsi.getNumSlotsOccupied() >= tsi.getCapacity()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- "Queue " + qsc.getQueueName() + " has reached its max " + type +
- " limit ");
- LOG.debug("Current running tasks " + tsi.getCapacity());
- }
- return true;
- }
- }
-
+ //check for maximum-capacity
if(tsi.getMaxCapacity() >= 0) {
- if(tsi.getNumSlotsOccupied() >= tsi.getMaxCapacity()) {
- if(LOG.isDebugEnabled()) {
+ if ((tsi.getNumSlotsOccupied() + noOfSlotsPerTask) >
+ tsi.getMaxCapacity()) {
+ if (LOG.isDebugEnabled()) {
LOG.debug(
- "Queue " + qsc.getQueueName() + " " +
- "has reached its max " + type + "Capacity" );
+ "Queue " + qsc.getQueueName() + " " + "has reached its max " +
+ type + "Capacity");
LOG.debug("Current running tasks " + tsi.getCapacity());
}
@@ -523,11 +531,11 @@
s.append(
String.format(
" Queue '%s'(%s): runningTasks=%d, "
- + "occupiedSlots=%d, capacity=%d, runJobs=%d maxTaskLimit=%d ",
+ + "occupiedSlots=%d, capacity=%d, runJobs=%d maximumCapacity=%d ",
qsi.getQueueName(),
this.type, tsi.getNumRunningTasks(),
tsi.getNumSlotsOccupied(), tsi.getCapacity(), (runJobs.size()),
- tsi.getMaxTaskLimit()));
+ tsi.getMaxCapacity()));
}
LOG.debug(s);
}
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/ContainerQueue.java Wed Oct 21 16:19:32 2009
@@ -172,6 +172,15 @@
//We dont have to check for 100 - totalCapacity being -ve , as
//we already do it while loading.
for (AbstractQueue q : unConfiguredQueues) {
+ if(q.qsc.getMaxCapacityPercent() > 0) {
+ if (q.qsc.getMaxCapacityPercent() < capacityShare) {
+ throw new IllegalStateException(
+ " Capacity share (" + capacityShare + ")for unconfigured queue " +
+ q.getName() +
+ " is greater than its maximum-capacity percentage " +
+ q.qsc.getMaxCapacityPercent());
+ }
+ }
q.qsc.setCapacityPercent(capacityShare);
LOG.info("Capacity share for un configured queue " + q.getName() + "" +
" is " + capacityShare);
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueHierarchyBuilder.java Wed Oct 21 16:19:32 2009
@@ -161,9 +161,8 @@
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
// create our QSC and add to our hashmap
QueueSchedulingContext qsi = new QueueSchedulingContext(
- queueName, capacity, stretchCapacity, ulMin,
- schedConf.getMaxMapCap(
- queueName), schedConf.getMaxReduceCap(queueName));
+ queueName, capacity, stretchCapacity, ulMin
+ );
qsi.setSupportsPriorities(
schedConf.isPrioritySupported(
queueName));
@@ -179,7 +178,7 @@
*/
static AbstractQueue createRootAbstractQueue() {
QueueSchedulingContext rootContext =
- new QueueSchedulingContext("", 100, -1, -1, -1, -1);
+ new QueueSchedulingContext("", 100, -1, -1);
AbstractQueue root = new ContainerQueue(null, rootContext);
return root;
}
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/QueueSchedulingContext.java Wed Oct 21 16:19:32 2009
@@ -100,17 +100,14 @@
QueueSchedulingContext(
String queueName, float capacityPercent, float maxCapacityPercent,
- int ulMin,
- int mapCap, int reduceCap) {
- this.setQueueName(new String(queueName));
- this.setCapacityPercent(capacityPercent);
- this.setMaxCapacityPercent(maxCapacityPercent);
- this.setUlMin(ulMin);
- this.setMapTSC(new TaskSchedulingContext(TaskType.MAP));
- this.setReduceTSC(new TaskSchedulingContext(TaskType.REDUCE));
- this.getMapTSC().setMaxTaskLimit(mapCap);
- this.getReduceTSC().setMaxTaskLimit(reduceCap);
- }
+ int ulMin) {
+ this.setQueueName(new String(queueName));
+ this.setCapacityPercent(capacityPercent);
+ this.setMaxCapacityPercent(maxCapacityPercent);
+ this.setUlMin(ulMin);
+ this.setMapTSC(new TaskSchedulingContext(TaskType.MAP));
+ this.setReduceTSC(new TaskSchedulingContext(TaskType.REDUCE));
+ }
/**
* return information about the queue
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/TaskSchedulingContext.java Wed Oct 21 16:19:32 2009
@@ -42,9 +42,6 @@
public class TaskSchedulingContext {
private TaskType type;
- private static final String LIMIT_NORMALIZED_CAPACITY_STRING
- = "(Capacity is restricted to max limit of %d slots.\n" +
- "Remaining %d slots will be used by other queues.)\n";
/**
* the actual capacity, which depends on how many slots are available
* in the cluster at any given time.
@@ -60,15 +57,6 @@
private int maxCapacity = -1;
/**
- * max task limit
- * This value is the maximum slots that can be used in a
- * queue at any point of time. So for example assuming above config value
- * is 100 , not more than 100 tasks would be in the queue at any point of
- * time, assuming each task takes one slot.
- */
- private int maxTaskLimit = -1;
-
- /**
* for each user, we need to keep track of number of slots occupied by
* running tasks
*/
@@ -95,24 +83,11 @@
}
- int getMaxTaskLimit() {
- return maxTaskLimit;
- }
-
- void setMaxTaskLimit(int maxTaskCap) {
- this.maxTaskLimit = maxTaskCap;
- }
-
/**
- * This method checks for maxfinalLimit and
- * sends minimum of maxTaskLimit and capacity.
- *
+ * returns the capacity of queue as no of slots.
* @return
*/
int getCapacity() {
- if ((maxTaskLimit >= 0) && (maxTaskLimit < capacity)) {
- return maxTaskLimit;
- }
return capacity;
}
@@ -137,15 +112,8 @@
StringBuffer sb = new StringBuffer();
sb.append("Capacity: " + getCapacity() + " slots\n");
- //If maxTaskLimit is less than the capacity
- if (getMaxTaskLimit() >= 0 && getMaxTaskLimit() < getCapacity()) {
- sb.append(
- String.format(
- LIMIT_NORMALIZED_CAPACITY_STRING,
- getMaxTaskLimit(), (getCapacity() - getMaxTaskLimit())));
- }
- if (getMaxTaskLimit() >= 0) {
- sb.append(String.format("Maximum Slots Limit: %d\n", getMaxTaskLimit()));
+ if(getMaxCapacity() >= 0) {
+ sb.append("Maximum capacity: " + getMaxCapacity() +" slots\n");
}
sb.append(
String.format(
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/CapacityTestUtils.java Wed Oct 21 16:19:32 2009
@@ -229,7 +229,7 @@
taskTrackerManager.getTaskTracker(
taskTrackerName));
- if (tasks==null) {
+ if (tasks==null || tasks.isEmpty()) {
if (expectedTaskStrings.size() > 0) {
fail("Expected some tasks to be assigned, but got none.");
} else {
@@ -955,6 +955,8 @@
Properties p = new Properties();
p.setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(q.capacity));
+ p.setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
+ String.valueOf(q.maxCapacity));
p.setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
String.valueOf(q.supportsPrio));
p.setProperty(
@@ -986,6 +988,7 @@
static class FakeQueueInfo {
String queueName;
float capacity;
+ float maxCapacity = -1.0f;
boolean supportsPrio;
int ulMin;
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java Wed Oct 21 16:19:32 2009
@@ -84,107 +84,54 @@
}
/**
- * Test the max map limit.
- *
+ * Test max capacity
* @throws IOException
*/
- public void testMaxMapCap() throws IOException {
+ public void testMaxCapacity() throws IOException {
this.setUp(4, 1, 1);
taskTrackerManager.addQueues(new String[]{"default"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
+ queues.add(new FakeQueueInfo("default", 25.0f, false, 1));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
-
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(2);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(-1);
-
+ scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
+ .setMaxCapacityPercent(50.0f);
//submit the Job
- FakeJobInProgress fjob1 =
- taskTrackerManager.submitJob(JobStatus.PREP, 3, 1, "default", "user");
+ FakeJobInProgress fjob1 = taskTrackerManager.submitJob(
+ JobStatus.PREP, 4, 4, "default", "user");
taskTrackerManager.initJob(fjob1);
+ HashMap<String, String> expectedStrings = new HashMap<String, String>();
- //1 map and 1 reduce assigned
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- //2 map are assigned reached the maxlimit
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
-
- //task3 is null as maxlimit is reached.
- List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
- assertNull(task3);
- //Now complete the task 1.
- // complete the job
- for(Task task: task1) {
- taskTrackerManager.finishTask(
- task.getTaskID().toString(),
- fjob1);
- }
- //We have completed the tt1 task which was a map task so we expect one map
- //task to be picked up
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_m_000003_0 on tt4");
- }
-
- /**
- * Test max reduce limit
- *
- * @throws IOException
- */
- public void testMaxReduceCap() throws IOException {
- this.setUp(4, 1, 1);
- taskTrackerManager.addQueues(new String[]{"default"});
- ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, false, 1));
-
-
- taskTrackerManager.setFakeQueues(queues);
- scheduler.start();
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getMapTSC().setMaxTaskLimit(-1);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext().getReduceTSC().setMaxTaskLimit(2);
-
-
- //submit the Job
- FakeJobInProgress fjob1 =
- taskTrackerManager.submitJob(JobStatus.PREP, 1, 3, "default", "user");
-
- taskTrackerManager.initJob(fjob1);
+ expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000001_0 on tt1");
+ List<Task> task1 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
- //1 map and 1 reduce
- List<Task> task1 = scheduler.assignTasks(tracker("tt1"));
- // 1 reduce assigned
- List<Task> task2 = scheduler.assignTasks(tracker("tt2"));
+ expectedStrings.put(MAP, "attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000002_0 on tt2");
+ List<Task> task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt2", expectedStrings);
- // No tasks should be assigned, as we have reached the max cap.
+ //we have already reached the limit
+ //this call would return null
List<Task> task3 = scheduler.assignTasks(tracker("tt3"));
assertNull(task3);
//Now complete the task 1 i.e map task.
- for(Task task: task1) {
- if (task.isMapTask()) {
+ for (Task task : task1) {
taskTrackerManager.finishTask(
- task.getTaskID().toString(),
- fjob1);
- }
+ task.getTaskID().toString(), fjob1);
}
-
- //Still no slots available for reduce hence no tasks
- //assigned
- assertNull(scheduler.assignTasks(tracker("tt1")));
- //Complete the reduce task
- taskTrackerManager.finishTask(
- task2.get(0).getTaskID().toString(), fjob1);
-
- //One reduce is done hence assign the new reduce.
- checkAssignment(
- taskTrackerManager, scheduler, "tt4",
- "attempt_test_0001_r_000003_0 on tt4");
+ expectedStrings.put(MAP, "attempt_test_0001_m_000003_0 on tt1");
+ expectedStrings.put(REDUCE, "attempt_test_0001_r_000003_0 on tt1");
+ task2 = checkMultipleTaskAssignment(
+ taskTrackerManager, scheduler, "tt1", expectedStrings);
}
// test job run-state change
@@ -486,6 +433,28 @@
assertEquals(18.75f, jqm.getJobQueue("qAZ4").qsc.getCapacityPercent());
}
+ public void testCapacityAllocFailureWithLowerMaxCapacity() throws Exception {
+ String[] qs = {"default", "qAZ1"};
+ taskTrackerManager.addQueues(qs);
+ ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
+ queues.add(new FakeQueueInfo("default", 25.0f, true, 25));
+ FakeQueueInfo qi = new FakeQueueInfo("qAZ1", -1.0f, true, 25);
+ qi.maxCapacity = 40.0f;
+ queues.add(qi);
+ taskTrackerManager.setFakeQueues(queues);
+ try {
+ scheduler.start();
+ fail("scheduler start should fail ");
+ }catch(IOException ise) {
+ Throwable e = ise.getCause();
+ assertTrue(e instanceof IllegalStateException);
+ assertEquals(
+ e.getMessage(),
+ " Capacity share (" + 75.0f + ")for unconfigured queue " + "qAZ1" +
+ " is greater than its maximum-capacity percentage " + 40.0f);
+ }
+ }
+
// Tests how capacity is computed and assignment of tasks done
// on the basis of the capacity.
public void testCapacityBasedAllocation() throws Exception {
@@ -589,21 +558,16 @@
}
/**
- * Creates a queue with max task limit of 2
- * submit 1 job in the queue which is high ram(2 slots) . As 2 slots are
- * given to high ram job and are reserved , no other tasks are accepted .
- *
+ * test the high memory blocking with max capacity.
* @throws IOException
*/
- public void testHighMemoryBlockingWithMaxLimit()
+ public void testHighMemoryBlockingWithMaxCapacity()
throws IOException {
-
- // 2 map and 1 reduce slots
- taskTrackerManager = new FakeTaskTrackerManager(2, 2, 1);
+ taskTrackerManager = new FakeTaskTrackerManager(2, 2, 2);
taskTrackerManager.addQueues(new String[]{"defaultXYZM"});
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("defaultXYZM", 100.0f, true, 25));
+ queues.add(new FakeQueueInfo("defaultXYZM", 25.0f, true, 50));
scheduler.setTaskTrackerManager(taskTrackerManager);
@@ -611,138 +575,137 @@
// Normal job in the cluster would be 1GB maps/reduces
scheduler.getConf().setLong(JTConfig.JT_MAX_MAPMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.MAPMEMORY_MB, 1 * 1024);
- scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 1 * 1024);
+ scheduler.getConf().setLong(JTConfig.JT_MAX_REDUCEMEMORY_MB, 2 * 1024);
scheduler.getConf().setLong(MRConfig.REDUCEMEMORY_MB, 1 * 1024);
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
- .getMapTSC().setMaxTaskLimit(2);
-
+ .setMaxCapacityPercent(50);
- // The situation : Submit 2 jobs with high memory map task
- //Set the max limit for queue to 2 ,
- // try submitting more map tasks to the queue , it should not happen
-
- LOG.debug(
- "Submit one high memory(2GB maps, 0MB reduces) job of "
- + "2 map tasks");
JobConf jConf = new JobConf(conf);
jConf.setMemoryForMapTask(2 * 1024);
- jConf.setMemoryForReduceTask(0);
+ jConf.setMemoryForReduceTask(1 * 1024);
jConf.setNumMapTasks(2);
- jConf.setNumReduceTasks(0);
+ jConf.setNumReduceTasks(1);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job1 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
- LOG.debug(
- "Submit another regular memory(1GB vmem maps/reduces) job of "
- + "2 map/red tasks");
jConf = new JobConf(conf);
jConf.setMemoryForMapTask(1 * 1024);
- jConf.setMemoryForReduceTask(1 * 1024);
- jConf.setNumMapTasks(2);
+ jConf.setMemoryForReduceTask(2 * 1024);
+ jConf.setNumMapTasks(1);
jConf.setNumReduceTasks(2);
jConf.setQueueName("defaultXYZM");
jConf.setUser("u1");
FakeJobInProgress job2 = taskTrackerManager.submitJobAndInit(
JobStatus.PREP, jConf);
- // first, a map from j1 will run this is a high memory job so it would
- // occupy the 2 slots and it would try to assign the reduce task from
- //job2.
- Map<String, String> expectedStrings = new HashMap<String, String>();
- expectedStrings.put(MAP, "attempt_test_0001_m_000001_0 on tt1");
- expectedStrings.put(REDUCE, "attempt_test_0002_r_000001_0 on tt1");
- checkMultipleTaskAssignment(
- taskTrackerManager, scheduler, "tt1",
- expectedStrings);
+ //high ram map from job 1 and normal reduce task from job 1
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
+
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 100.0f, 1, 1);
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 1, 100.0f,0,2);
checkMemReservedForTasksOnTT("tt1", 2 * 1024L, 1 * 1024L);
- //at this point , the scheduler tries to schedule another map from j2 for
- //another task tracker.
- // This should not happen as all the map slots are taken
- //by the first task itself.hence reduce task from the second job is given
- expectedStrings.clear();
- expectedStrings.put(REDUCE, "attempt_test_0002_r_000002_0 on tt2");
- checkMultipleTaskAssignment(
- taskTrackerManager, scheduler, "tt2",
- expectedStrings);
+ //we have reached the maximum limit for map, so no more map tasks.
+ //we have used 1 reduce already and 1 more reduce slot is left for the
+ //before we reach maxcapacity for reduces.
+ // But current 1 slot + 2 slots for high ram reduce would
+ //mean we are crossing the maxium capacity.hence nothing would be assigned
+ //in this call
+ assertNull(scheduler.assignTasks(tracker("tt2")));
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job1);
+ }
+
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ checkOccupiedSlots("defaultXYZM", TaskType.MAP, 1, 2, 200.0f,1,0);
+ checkOccupiedSlots("defaultXYZM", TaskType.REDUCE, 1, 2, 200.0f,0,2);
+ checkMemReservedForTasksOnTT("tt2", 2 * 1024L, 2 * 1024L);
+
+ //complete the high ram job on tt1.
+ for (Task task : tasks) {
+ taskTrackerManager.finishTask(
+ task.getTaskID().toString(),
+ job2);
+ }
+
+
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
}
/**
* test if user limits automatically adjust to max map or reduce limit
*/
- public void testUserLimitsWithMaxLimits() throws Exception {
- setUp(4, 4, 4);
+ public void testUserLimitsWithMaxCapacity() throws Exception {
+ setUp(2, 2, 2);
// set up some queues
String[] qs = {"default"};
taskTrackerManager.addQueues(qs);
ArrayList<FakeQueueInfo> queues = new ArrayList<FakeQueueInfo>();
- queues.add(new FakeQueueInfo("default", 100.0f, true, 50));
+ queues.add(new FakeQueueInfo("default", 50.0f, true, 50));
taskTrackerManager.setFakeQueues(queues);
scheduler.start();
scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
- .getMapTSC().setMaxTaskLimit(2);
- scheduler.getRoot().getChildren().get(0).getQueueSchedulingContext()
- .getReduceTSC().setMaxTaskLimit(2);
-
+ .setMaxCapacityPercent(75);
// submit a job
FakeJobInProgress fjob1 =
- taskTrackerManager.submitJobAndInit(
- JobStatus.PREP, 10, 10, "default", "u1");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u1");
FakeJobInProgress fjob2 =
- taskTrackerManager.submitJobAndInit(
- JobStatus.PREP, 10, 10, "default", "u2");
+ taskTrackerManager.submitJobAndInit(JobStatus.PREP, 10, 10, "default", "u2");
- // for queue 'default', the capacity for maps is 2.
- // But the max map limit is 2
- // hence user should be getting not more than 1 as it is the 50%.
- //same with reduce
- Map<String, String> expectedStrings = new HashMap<String, String>();
- populateExpectedStrings(expectedStrings,
- "attempt_test_0001_m_000001_0 on tt1",
- "attempt_test_0001_r_000001_0 on tt1");
- List<Task> t1 = checkMultipleTaskAssignment(
- taskTrackerManager, scheduler, "tt1",
- expectedStrings);
+ // for queue 'default', maxCapacity for map and reduce is 3.
+ // initial user limit for 50% assuming there are 2 users/queue is.
+ // 1 map and 1 reduce.
+ // after max capacity it is 1.5 each.
- //Now we should get the task from the other job. As the
- //first user has reached his max map limit.
- //same with reduce
- populateExpectedStrings(expectedStrings,
- "attempt_test_0002_m_000001_0 on tt2",
- "attempt_test_0002_r_000001_0 on tt2");
- checkMultipleTaskAssignment(
- taskTrackerManager, scheduler, "tt2",
- expectedStrings);
+ //first job would be given 1 job each.
+ HashMap<String,String> expectedStrings = new HashMap<String,String>();
+ expectedStrings.put(MAP,"attempt_test_0001_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000001_0 on tt1");
- //Now we are done with map and reduce limit ,
- // now if we ask for task we should
- // get null.
- List<Task> t3 = scheduler.assignTasks(tracker("tt3"));
- assertNull(t3);
+ List<Task> tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
- //We completed 1 map and 1 reduce in here
- for (Task task : t1) {
- taskTrackerManager.finishTask(
- task.getTaskID().toString(),
- fjob1);
- }
- //again we would assign 1 map and 1 reduce
- populateExpectedStrings(expectedStrings,
- "attempt_test_0001_m_000002_0 on tt1",
- "attempt_test_0001_r_000002_0 on tt1");
- checkMultipleTaskAssignment(
- taskTrackerManager, scheduler, "tt1",
- expectedStrings);
+ //for user u1 we have reached the limit. that is 1 job.
+ //1 more map and reduce tasks.
+ expectedStrings.put(MAP,"attempt_test_0002_m_000001_0 on tt1");
+ expectedStrings.put(REDUCE,"attempt_test_0002_r_000001_0 on tt1");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt1", expectedStrings);
+
+ expectedStrings.put(MAP,"attempt_test_0001_m_000002_0 on tt2");
+ expectedStrings.put(REDUCE,"attempt_test_0001_r_000002_0 on tt2");
+
+ tasks = checkMultipleTaskAssignment(taskTrackerManager,scheduler,
+ "tt2", expectedStrings);
+
+ assertNull(scheduler.assignTasks(tracker("tt2")));
}
// Utility method to construct a map of expected strings
@@ -2728,16 +2691,12 @@
* @param incrReduceIndex
*/
private void checkOccupiedSlots(
- String queue,
- TaskType type, int numActiveUsers,
- int expectedOccupiedSlots, float expectedOccupiedSlotsPercent,
- int incrMapIndex
- , int incrReduceIndex
- ) {
+ String queue, TaskType type, int numActiveUsers, int expectedOccupiedSlots,
+ float expectedOccupiedSlotsPercent, int incrMapIndex, int incrReduceIndex) {
scheduler.updateContextInfoForTests();
QueueManager queueManager = scheduler.taskTrackerManager.getQueueManager();
- String schedulingInfo =
- queueManager.getJobQueueInfo(queue).getSchedulingInfo();
+ String schedulingInfo = queueManager.getJobQueueInfo(queue)
+ .getSchedulingInfo();
String[] infoStrings = schedulingInfo.split("\n");
int index = -1;
if (type.equals(TaskType.MAP)) {
@@ -2749,9 +2708,8 @@
LOG.info(infoStrings[index]);
assertEquals(
String.format(
- "Used capacity: %d (%.1f%% of Capacity)",
- expectedOccupiedSlots, expectedOccupiedSlotsPercent),
- infoStrings[index]);
+ "Used capacity: %d (%.1f%% of Capacity)", expectedOccupiedSlots,
+ expectedOccupiedSlotsPercent), infoStrings[index]);
}
/**
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Wed Oct 21 16:19:32 2009
@@ -149,9 +149,6 @@
prp.setProperty("maximum-capacity","20.5");
prp.setProperty("supports-priority","false");
prp.setProperty("minimum-user-limit-percent","23");
- prp.setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,"43");
- prp.setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,"43");
-
CapacitySchedulerConf conf = new CapacitySchedulerConf();
conf.setProperties("default",prp);
@@ -160,10 +157,6 @@
assertTrue(conf.getMaxCapacity("default") == 20.5f);
assertTrue(conf.isPrioritySupported("default") == false);
assertTrue(conf.getMinimumUserLimitPercent("default")==23);
- assertTrue(conf.getMaxMapCap("default") == 43);
- assertTrue(conf.getMaxReduceCap("default") == 43);
-
-
//check for inproper stuff
prp.setProperty("capacity","h");
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestContainerQueue.java Wed Oct 21 16:19:32 2009
@@ -71,22 +71,22 @@
// its children.
//level 1 children
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "a", 25, -1, -1, -1, -1);
+ "a", 25, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "b", 25, -1, -1, -1, -1);
+ "b", 25, -1, -1);
AbstractQueue q = new ContainerQueue(rt, a1);
AbstractQueue ql = new ContainerQueue(rt, a2);
//level 2 children
QueueSchedulingContext a = new QueueSchedulingContext(
- "aa", 50, -1, -1, -1, -1);
+ "aa", 50, -1, -1);
QueueSchedulingContext b = new QueueSchedulingContext(
- "ab", 50, -1, -1, -1, -1);
+ "ab", 50, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
- "ac", 50, -1, -1, -1, -1);
+ "ac", 50, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
- "ad", 50, -1, -1, -1, -1);
+ "ad", 50, -1, -1);
AbstractQueue q1 = new JobQueue(q, a);
AbstractQueue q2 = new JobQueue(q, b);
@@ -126,11 +126,11 @@
AbstractQueue rt = QueueHierarchyBuilder.createRootAbstractQueue();
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "R.a", 25, 50, -1, -1, -1);
+ "R.a", 25, 50, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "R.b", 25, 30, -1, -1, -1);
+ "R.b", 25, 30, -1);
QueueSchedulingContext a3 = new QueueSchedulingContext(
- "R.c", 50, -1, -1, -1, -1);
+ "R.c", 50, -1, -1);
//Test for max capacity
@@ -162,8 +162,6 @@
taskTrackerManager, scheduler, "tt1",
expectedStrings);
- //Now the queue has already reached its max limit no further tasks should
- // be given.
expectedStrings.clear();
expectedStrings.put(
CapacityTestUtils.MAP,
@@ -187,20 +185,20 @@
//generate Queuecontext for the children
QueueSchedulingContext a1 = new QueueSchedulingContext(
- "a", 50, -1, -1, -1, -1);
+ "a", 50, -1, -1);
QueueSchedulingContext a2 = new QueueSchedulingContext(
- "b", -1, -1, -1, -1, -1);
+ "b", -1, -1, -1);
AbstractQueue rtChild1 = new ContainerQueue(rt, a1);
AbstractQueue rtChild2 = new ContainerQueue(rt, a2);
//Add further children to rtChild1.
QueueSchedulingContext b = new QueueSchedulingContext(
- "ab", 30, -1, -1, -1, -1);
+ "ab", 30, -1, -1);
QueueSchedulingContext c = new QueueSchedulingContext(
- "ac", -1, -1, -1, -1, -1);
+ "ac", -1, -1, -1);
QueueSchedulingContext d = new QueueSchedulingContext(
- "ad", 100, -1, -1, -1, -1);
+ "ad", 100, -1, -1);
AbstractQueue q0 = new JobQueue(rtChild1, b);
AbstractQueue q1 = new JobQueue(rtChild1, c);
@@ -239,9 +237,9 @@
//Firt level
QueueSchedulingContext sch =
- new QueueSchedulingContext("rt.sch", a, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch", a, -1, -1);
QueueSchedulingContext gta =
- new QueueSchedulingContext("rt.gta", b, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.gta", b, -1, -1);
AbstractQueue schq = new ContainerQueue(rt, sch);
@@ -253,9 +251,9 @@
//Create further children.
QueueSchedulingContext prod =
- new QueueSchedulingContext("rt.sch.prod", c, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch.prod", c, -1, -1);
QueueSchedulingContext misc =
- new QueueSchedulingContext("rt.sch.misc", d, -1, -1, -1, -1);
+ new QueueSchedulingContext("rt.sch.misc", d, -1, -1);
AbstractQueue prodq = new JobQueue(schq, prod);
AbstractQueue miscq = new JobQueue(schq, misc);
@@ -596,24 +594,4 @@
assertEquals(mapTSC.getNumSlotsOccupied(), expectedUsedSlots[i]);
}
}
-
- private void printOrderedQueueData(AbstractQueue rt) {
- //print data at all levels.
- List<AbstractQueue> aq = rt.getChildren();
- System.out.println();
- for (AbstractQueue a : aq) {
- System.out.println(
- " // " + a.getName() + "-> data " +
- a.getQueueSchedulingContext().getMapTSC().getCapacity() + " " +
- " " +
- a.getQueueSchedulingContext().getMapTSC().getNumSlotsOccupied());
- double f = ((double) a.getQueueSchedulingContext().getMapTSC()
- .getNumSlotsOccupied() /
- (double) a.getQueueSchedulingContext().getMapTSC().getCapacity());
- System.out.println(" // rating -> " + f);
- if (a.getChildren() != null) {
- printOrderedQueueData(a);
- }
- }
- }
}
Modified: hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestRefreshOfQueues.java Wed Oct 21 16:19:32 2009
@@ -104,10 +104,6 @@
props[i] = queues[i].getProperties();
props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(i + 10));
- props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
- String.valueOf(i + 20));
- props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
- String.valueOf(i + 25));
props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
String.valueOf(i + 15));
props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -136,8 +132,6 @@
allQueues.get(qName).getQueueSchedulingContext();
LOG.info("Context for queue " + qName + " is : " + qsc);
assertEquals(i + 10, qsc.getCapacityPercent(), 0);
- assertEquals(i + 20, qsc.getMapTSC().getMaxTaskLimit());
- assertEquals(i + 25, qsc.getReduceTSC().getMaxTaskLimit());
assertEquals(i + 15, qsc.getMaxCapacityPercent(), 0);
assertEquals(Boolean.valueOf(false),
Boolean.valueOf(qsc.supportsPriorities()));
@@ -149,10 +143,6 @@
props[i] = queues[i].getProperties();
props[i].setProperty(CapacitySchedulerConf.CAPACITY_PROPERTY,
String.valueOf(i + 20));
- props[i].setProperty(CapacitySchedulerConf.MAX_MAP_CAP_PROPERTY,
- String.valueOf(i + 30));
- props[i].setProperty(CapacitySchedulerConf.MAX_REDUCE_CAP_PROPERTY,
- String.valueOf(i + 35));
props[i].setProperty(CapacitySchedulerConf.MAX_CAPACITY_PROPERTY,
String.valueOf(i + 25));
props[i].setProperty(CapacitySchedulerConf.SUPPORTS_PRIORITY_PROPERTY,
@@ -183,8 +173,6 @@
assertEquals(qName, qsc.getQueueName());
LOG.info("Context for queue " + qName + " is : " + qsc);
assertEquals(i + 20, qsc.getCapacityPercent(), 0);
- assertEquals(i + 30, qsc.getMapTSC().getMaxTaskLimit());
- assertEquals(i + 35, qsc.getReduceTSC().getMaxTaskLimit());
assertEquals(i + 25, qsc.getMaxCapacityPercent(), 0);
assertEquals(Boolean.valueOf(false),
Boolean.valueOf(qsc.supportsPriorities()));
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml?rev=828081&r1=828080&r2=828081&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/capacity_scheduler.xml Wed Oct 21 16:19:32 2009
@@ -204,38 +204,6 @@
users, no user can use more than 25% of the queue's resources. A
value of 100 implies no user limits are imposed.</td>
</tr>
- <tr><td>mapred.capacity-scheduler.queue.<queue-<br/>name>.max.map.slots</td>
- <td>
- This value is the maximum max slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 tasks would be in the queue at any point of
- time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </td>
- </tr>
- <tr><td>mapred.capacity-scheduler.queue.<queue-<br/>name>.max.reduce.slots</td>
- <td>
- This value is the maximum reduce slots that can be used in a
- queue at any point of time. So for example assuming above config value
- is 100 , not more than 100 tasks would be in the queue at any point of
- time, assuming each task takes one slot.
-
- Default value of -1 would disable this capping feature
-
- Typically the queue capacity should be equal to this limit.
- If queue capacity is more than this limit, excess capacity will be
- used by the other queues. If queue capacity is less than the above
- limit , then the limit would be the queue capacity - as in the current
- implementation
- </td>
- </tr>
</table>
</section>