You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by yh...@apache.org on 2009/05/05 09:34:23 UTC
svn commit: r771609 [1/2] - in /hadoop/core/branches/branch-0.20: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
Author: yhemanth
Date: Tue May 5 07:34:23 2009
New Revision: 771609
URL: http://svn.apache.org/viewvc?rev=771609&view=rev
Log:
Merge -r 771606:771607 from trunk to branch 0.20 to fix HADOOP-5726.
Modified:
hadoop/core/branches/branch-0.20/ (props changed)
hadoop/core/branches/branch-0.20/CHANGES.txt (contents, props changed)
hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacityScheduler.java
hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
Propchange: hadoop/core/branches/branch-0.20/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 5 07:34:23 2009
@@ -1,2 +1,2 @@
/hadoop/core/branches/branch-0.19:713112
-/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951
+/hadoop/core/trunk:727001,727117,727191,727212,727217,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,736426,738328,738697,740077,740157,741703,741762,743745,743816,743892,744894,745180,746010,746206,746227,746233,746274,746338,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755960,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607
Modified: hadoop/core/branches/branch-0.20/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/CHANGES.txt?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/CHANGES.txt (original)
+++ hadoop/core/branches/branch-0.20/CHANGES.txt Tue May 5 07:34:23 2009
@@ -4,6 +4,9 @@
INCOMPATIBLE CHANGES
+ HADOOP-5726. Remove pre-emption from capacity scheduler code base.
+ (Rahul Kumar Singh via yhemanth)
+
NEW FEATURES
IMPROVEMENTS
Propchange: hadoop/core/branches/branch-0.20/CHANGES.txt
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Tue May 5 07:34:23 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.18/CHANGES.txt:727226
/hadoop/core/branches/branch-0.19/CHANGES.txt:713112
-/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951
+/hadoop/core/trunk/CHANGES.txt:727001,727117,727191,727212,727228,727255,727869,728187,729052,729987,732385,732572,732613,732777,732838,732869,733887,734870,734916,735082,736426,738602,738697,739416,740077,740157,741703,741762,743296,743745,743816,743892,744894,745180,745268,746010,746193,746206,746227,746233,746274,746902-746903,746925,746944,746968,746970,747279,747289,747802,748084,748090,748783,749262,749318,749863,750533,752073,752514,752555,752590,752609,752834,752836,752913,752932,753112-753113,753346,754645,754847,754927,755035,755226,755348,755370,755418,755426,755790,755905,755938,755986,755998,756352,757448,757624,757849,758156,759398,759932,760502,760783,761046,761482,761632,762216,762879,763107,763502,764967,765016,765809,765951,771607
Modified: hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template (original)
+++ hadoop/core/branches/branch-0.20/conf/capacity-scheduler.xml.template Tue May 5 07:34:23 2009
@@ -8,22 +8,14 @@
<configuration>
<property>
- <name>mapred.capacity-scheduler.queue.default.guaranteed-capacity</name>
+ <name>mapred.capacity-scheduler.queue.default.capacity</name>
<value>100</value>
<description>Percentage of the number of slots in the cluster that are
- guaranteed to be available for jobs in this queue.
+ to be available for jobs in this queue.
</description>
</property>
<property>
- <name>mapred.capacity-scheduler.queue.default.reclaim-time-limit</name>
- <value>300</value>
- <description>The amount of time, in seconds, before which
- resources distributed to other queues will be reclaimed.
- </description>
- </property>
-
- <property>
<name>mapred.capacity-scheduler.queue.default.supports-priority</name>
<value>false</value>
<description>If true, priorities of jobs will be taken into
@@ -54,29 +46,10 @@
</description>
</property>
-
- <property>
- <name>mapred.capacity-scheduler.reclaimCapacity.interval</name>
- <value>5</value>
- <description>The time interval, in seconds, between which the scheduler
- periodically determines whether capacity needs to be reclaimed for
- any queue.
- </description>
- </property>
-
<!-- The default configuration settings for the capacity task scheduler -->
<!-- The default values would be applied to all the queues which don't have -->
<!-- the appropriate property for the particular queue -->
<property>
- <name>mapred.capacity-scheduler.default-reclaim-time-limit</name>
- <value>300</value>
- <description>The amount of time, in seconds, before which
- resources distributed to other queues will be reclaimed by default
- in a job queue.
- </description>
- </property>
-
- <property>
<name>mapred.capacity-scheduler.default-supports-priority</name>
<value>false</value>
<description>If true, priorities of jobs will be taken into
Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacitySchedulerConf.java Tue May 5 07:34:23 2009
@@ -34,8 +34,6 @@
/** Default file name from which the resource manager configuration is read. */
public static final String SCHEDULER_CONF_FILE = "capacity-scheduler.xml";
- private int defaultReclaimTime;
-
private int defaultUlimitMinimum;
private boolean defaultSupportPriority;
@@ -117,8 +115,6 @@
* which is used by the Capacity Scheduler.
*/
private void initializeDefaults() {
- defaultReclaimTime = rmConf.getInt(
- "mapred.capacity-scheduler.default-reclaim-time-limit",300);
defaultUlimitMinimum = rmConf.getInt(
"mapred.capacity-scheduler.default-minimum-user-limit-percent", 100);
defaultSupportPriority = rmConf.getBoolean(
@@ -129,33 +125,33 @@
}
/**
- * Get the guaranteed percentage of the cluster for the specified queue.
+ * Get the percentage of the cluster for the specified queue.
*
- * This method defaults to configured default Guaranteed Capacity if
+ * This method defaults to configured default Capacity if
* no value is specified in the configuration for this queue.
* If the configured capacity is negative value or greater than 100 an
* {@link IllegalArgumentException} is thrown.
*
- * If default Guaranteed capacity is not configured for a queue, then
+ * If default capacity is not configured for a queue, then
* system allocates capacity based on what is free at the time of
* capacity scheduler start
*
*
* @param queue name of the queue
- * @return guaranteed percent of the cluster for the queue.
+ * @return percent of the cluster for the queue.
*/
- public float getGuaranteedCapacity(String queue) {
- //Check done in order to return default GC which can be negative
- //In case of both GC and default GC not configured.
+ public float getCapacity(String queue) {
+ //Check done in order to return default capacity which can be negative
+ //In case of both capacity and default capacity not configured.
//Last check is if the configuration is specified and is marked as
//negative we throw exception
String raw = rmConf.getRaw(toFullPropertyName(queue,
- "guaranteed-capacity"));
+ "capacity"));
if(raw == null) {
return -1;
}
float result = rmConf.getFloat(toFullPropertyName(queue,
- "guaranteed-capacity"),
+ "capacity"),
-1);
if (result < 0.0 || result > 100.0) {
throw new IllegalArgumentException("Illegal capacity for queue " + queue +
@@ -165,53 +161,13 @@
}
/**
- * Sets the Guaranteed capacity of the given queue.
- *
- * @param queue name of the queue
- * @param gc guaranteed percent of the cluster for the queue.
- */
- public void setGuaranteedCapacity(String queue,float gc) {
- rmConf.setFloat(toFullPropertyName(queue, "guaranteed-capacity"),gc);
- }
-
-
- /**
- * Get the amount of time before which redistributed resources must be
- * reclaimed for the specified queue.
- *
- * The resource manager distributes spare capacity from a free queue
- * to ones which are in need for more resources. However, if a job
- * submitted to the first queue requires back the resources, they must
- * be reclaimed within the specified configuration time limit.
- *
- * This method defaults to configured default reclaim time limit if
- * no value is specified in the configuration for this queue.
- *
- * Throws an {@link IllegalArgumentException} when invalid value is
- * configured.
+ * Sets the capacity of the given queue.
*
* @param queue name of the queue
- * @return reclaim time limit for this queue.
- */
- public int getReclaimTimeLimit(String queue) {
- int reclaimTimeLimit = rmConf.getInt(toFullPropertyName(queue, "reclaim-time-limit"),
- defaultReclaimTime);
- if(reclaimTimeLimit <= 0) {
- throw new IllegalArgumentException("Invalid reclaim time limit : "
- + reclaimTimeLimit + " for queue : " + queue);
- }
- return reclaimTimeLimit;
- }
-
- /**
- * Set the amount of time before which redistributed resources must be
- * reclaimed for the specified queue.
- * @param queue Name of the queue
- * @param value Amount of time before which the redistributed resources
- * must be retained.
+ * @param gc percent of the cluster for the queue.
*/
- public void setReclaimTimeLimit(String queue, int value) {
- rmConf.setInt(toFullPropertyName(queue, "reclaim-time-limit"), value);
+ public void setCapacity(String queue,float gc) {
+ rmConf.setFloat(toFullPropertyName(queue, "capacity"),gc);
}
/**
@@ -435,30 +391,4 @@
public void setDefaultPercentOfPmemInVmem(float value) {
rmConf.setFloat(DEFAULT_PERCENTAGE_OF_PMEM_IN_VMEM_PROPERTY, value);
}
-
- /**
- * Gets the reclaim capacity thread interval.
- *
- * @return reclaim capacity interval
- */
-
- public long getReclaimCapacityInterval() {
- long reclaimCapacityInterval =
- rmConf.getLong("mapred.capacity-scheduler.reclaimCapacity.interval", 5);
-
- if(reclaimCapacityInterval <= 0) {
- throw new IllegalArgumentException("Invalid reclaim capacity " +
- "interval, should be greater than zero");
- }
- return reclaimCapacityInterval;
- }
- /**
- * Sets the reclaim capacity thread interval.
- *
- * @param value
- */
- public void setReclaimCapacityInterval(long value) {
- rmConf.setLong("mapred.capacity-scheduler.reclaimCapacity.interval",
- value);
- }
}
Modified: hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java?rev=771609&r1=771608&r2=771609&view=diff
==============================================================================
--- hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java (original)
+++ hadoop/core/branches/branch-0.20/src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/CapacityTaskScheduler.java Tue May 5 07:34:23 2009
@@ -42,15 +42,12 @@
* and provides a HOD-less way to share large clusters. This scheduler
* provides the following features:
* * support for queues, where a job is submitted to a queue.
- * * Queues are guaranteed a fraction of the capacity of the grid (their
- * 'guaranteed capacity') in the sense that a certain capacity of resources
+ * * Queues are assigned a fraction of the capacity of the grid (their
+ * 'capacity') in the sense that a certain capacity of resources
* will be at their disposal. All jobs submitted to the queues of an Org
- * will have access to the capacity guaranteed to the Org.
- * * Free resources can be allocated to any queue beyond its guaranteed
- * capacity. These excess allocated resources can be reclaimed and made
- * available to another queue in order to meet its capacity guarantee.
- * * The scheduler guarantees that excess resources taken from a queue will
- * be restored to it within N minutes of its need for them.
+ * will have access to the capacity to the Org.
+ * * Free resources can be allocated to any queue beyond its
+ * capacity.
* * Queues optionally support job priorities (disabled by default).
* * Within a queue, jobs with higher priority will have access to the
* queue's resources before jobs with lower priority. However, once a job
@@ -62,42 +59,11 @@
*/
class CapacityTaskScheduler extends TaskScheduler {
- /**
- * For keeping track of reclaimed capacity.
- * Whenever slots need to be reclaimed, we create one of these objects.
- * As the queue gets slots, the amount to reclaim gets decremented. if
- * we haven't reclaimed enough within a certain time, we need to kill
- * tasks. This object 'expires' either if all resources are reclaimed
- * before the deadline, or the deadline passes .
- */
- private static class ReclaimedResource {
- // how much resource to reclaim
- public int originalAmount;
- // how much is to be reclaimed currently
- public int currentAmount;
- // the time, in millisecs, when this object expires.
- // This time is equal to the time when the object was created, plus
- // the reclaim-time SLA for the queue.
- public long whenToExpire;
- // we also keep track of when to kill tasks, in millisecs. This is a
- // fraction of 'whenToExpire', but we store it here so we don't
- // recompute it every time.
- public long whenToKill;
-
- public ReclaimedResource(int amount, long expiryTime,
- long whenToKill) {
- this.originalAmount = amount;
- this.currentAmount = amount;
- this.whenToExpire = expiryTime;
- this.whenToKill = whenToKill;
- }
- }
-
/***********************************************************************
* Keeping track of scheduling information for queues
*
* We need to maintain scheduling information relevant to a queue (its
- * name, guaranteed capacity, etc), along with information specific to
+ * name, capacity, etc), along with information specific to
* each kind of task, Map or Reduce (num of running tasks, pending
* tasks etc).
*
@@ -115,58 +81,18 @@
* the actual gc, which depends on how many slots are available
* in the cluster at any given time.
*/
- int guaranteedCapacity = 0;
+ int capacity = 0;
// number of running tasks
int numRunningTasks = 0;
- // number of pending tasks
- int numPendingTasks = 0;
/** for each user, we need to keep track of number of running tasks */
Map<String, Integer> numRunningTasksByUser =
new HashMap<String, Integer>();
/**
- * We need to keep track of resources to reclaim.
- * Whenever a queue is under capacity and has tasks pending, we offer it
- * an SLA that gives it free slots equal to or greater than the gap in
- * its capacity, within a period of time (reclaimTime).
- * To do this, we periodically check if queues need to reclaim capacity.
- * If they do, we create a ResourceReclaim object. We also periodically
- * check if a queue has received enough free slots within, say, 80% of
- * its reclaimTime. If not, we kill enough tasks to make up the
- * difference.
- * We keep two queues of ResourceReclaim objects. when an object is
- * created, it is placed in one queue. Once we kill tasks to recover
- * resources for that object, it is placed in an expiry queue. we need
- * to do this to prevent creating spurious ResourceReclaim objects. We
- * keep a count of total resources that are being reclaimed. This count
- * is decremented when an object expires.
- */
-
- /**
- * the list of resources to reclaim. This list is always sorted so that
- * resources that need to be reclaimed sooner occur earlier in the list.
- */
- LinkedList<ReclaimedResource> reclaimList =
- new LinkedList<ReclaimedResource>();
- /**
- * the list of resources to expire. This list is always sorted so that
- * resources that need to be expired sooner occur earlier in the list.
- */
- LinkedList<ReclaimedResource> reclaimExpireList =
- new LinkedList<ReclaimedResource>();
- /**
- * sum of all resources that are being reclaimed.
- * We keep this to prevent unnecessary ReclaimResource objects from being
- * created.
- */
- int numReclaimedResources = 0;
-
- /**
* reset the variables associated with tasks
*/
void resetTaskVars() {
numRunningTasks = 0;
- numPendingTasks = 0;
for (String s: numRunningTasksByUser.keySet()) {
numRunningTasksByUser.put(s, 0);
}
@@ -176,11 +102,11 @@
* return information about the tasks
*/
public String toString(){
- float runningTasksAsPercent = guaranteedCapacity!= 0 ?
- ((float)numRunningTasks * 100/guaranteedCapacity):0;
+ float runningTasksAsPercent = capacity!= 0 ?
+ ((float)numRunningTasks * 100/capacity):0;
StringBuffer sb = new StringBuffer();
- sb.append("Guaranteed Capacity: " + guaranteedCapacity + "\n");
- sb.append(String.format("Running tasks: %.1f%% of Guaranteed Capacity\n",
+ sb.append("Capacity: " + capacity + "\n");
+ sb.append(String.format("Running tasks: %.1f%% of Capacity\n",
runningTasksAsPercent));
// include info on active users
if (numRunningTasks != 0) {
@@ -202,8 +128,8 @@
private static class QueueSchedulingInfo {
String queueName;
- /** guaranteed capacity(%) is set in the config */
- float guaranteedCapacityPercent = 0;
+ /** capacity(%) is set in the config */
+ float capacityPercent = 0;
/**
* to handle user limits, we need to know how many users have jobs in
@@ -215,13 +141,6 @@
int ulMin;
/**
- * reclaim time limit (in msec). This time represents the SLA we offer
- * a queue - a queue gets back any lost capacity withing this period
- * of time.
- */
- long reclaimTime;
-
- /**
* We keep track of the JobQueuesManager only for reporting purposes
* (in toString()).
*/
@@ -234,11 +153,10 @@
TaskSchedulingInfo reduceTSI;
public QueueSchedulingInfo(String queueName, float gcPercent,
- int ulMin, long reclaimTime, JobQueuesManager jobQueuesManager) {
+ int ulMin, JobQueuesManager jobQueuesManager) {
this.queueName = new String(queueName);
- this.guaranteedCapacityPercent = gcPercent;
+ this.capacityPercent = gcPercent;
this.ulMin = ulMin;
- this.reclaimTime = reclaimTime;
this.jobQueuesManager = jobQueuesManager;
this.mapTSI = new TaskSchedulingInfo();
this.reduceTSI = new TaskSchedulingInfo();
@@ -253,12 +171,10 @@
StringBuffer sb = new StringBuffer();
sb.append("Queue configuration\n");
//sb.append("Name: " + queueName + "\n");
- sb.append("Guaranteed Capacity Percentage: ");
- sb.append(guaranteedCapacityPercent);
+ sb.append("Capacity Percentage: ");
+ sb.append(capacityPercent);
sb.append("%\n");
sb.append(String.format("User Limit: %d%s\n",ulMin, "%"));
- sb.append(String.format("Reclaim Time limit: %s\n",
- StringUtils.formatTime(reclaimTime)));
sb.append(String.format("Priority Supported: %s\n",
(jobQueuesManager.doesQueueSupportPriorities(queueName))?
"YES":"NO"));
@@ -300,9 +216,8 @@
public String toString(){
// note that we do not call updateQSIObjects() here for performance
// reasons. This means that the data we print out may be slightly
- // stale. This data is updated whenever assignTasks() is called, or
- // whenever the reclaim capacity thread runs, which should be fairly
- // often. If neither of these happen, the data gets stale. If we see
+ // stale. This data is updated whenever assignTasks() is called
+ // If this doesn't happen, the data gets stale. If we see
// this often, we may need to detect this situation and call
// updateQSIObjects(), or just call it each time.
return scheduler.getDisplayInfo(queueName);
@@ -372,14 +287,11 @@
abstract Task obtainNewTask(TaskTrackerStatus taskTracker,
JobInProgress job) throws IOException;
abstract int getPendingTasks(JobInProgress job);
- abstract int killTasksFromJob(JobInProgress job, int tasksToKill);
abstract TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi);
/**
* List of QSIs for assigning tasks.
- * This list is ordered such that queues that need to reclaim capacity
- * sooner, come before queues that don't. For queues that don't, they're
- * ordered by a ratio of (# of running tasks)/Guaranteed capacity, which
+ * Queues are ordered by a ratio of (# of running tasks)/capacity, which
* indicates how much 'free space' the queue has, or how much it is over
* capacity. This ordered list is iterated over, when assigning tasks.
*/
@@ -396,34 +308,15 @@
public int compare(QueueSchedulingInfo q1, QueueSchedulingInfo q2) {
TaskSchedulingInfo t1 = getTSI(q1);
TaskSchedulingInfo t2 = getTSI(q2);
- // if one queue needs to reclaim something and the other one doesn't,
- // the former is first
- if ((0 == t1.reclaimList.size()) && (0 != t2.reclaimList.size())) {
- return 1;
- }
- else if ((0 != t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
- return -1;
- }
- else if ((0 == t1.reclaimList.size()) && (0 == t2.reclaimList.size())){
- // neither needs to reclaim.
- // look at how much capacity they've filled. Treat a queue with gc=0
- // equivalent to a queue running at capacity
- double r1 = (0 == t1.guaranteedCapacity)? 1.0f:
- (double)t1.numRunningTasks/(double)t1.guaranteedCapacity;
- double r2 = (0 == t2.guaranteedCapacity)? 1.0f:
- (double)t2.numRunningTasks/(double)t2.guaranteedCapacity;
- if (r1<r2) return -1;
- else if (r1>r2) return 1;
- else return 0;
- }
- else {
- // both have to reclaim. Look at which one needs to reclaim earlier
- long tm1 = t1.reclaimList.get(0).whenToKill;
- long tm2 = t2.reclaimList.get(0).whenToKill;
- if (tm1<tm2) return -1;
- else if (tm1>tm2) return 1;
- else return 0;
- }
+ // look at how much capacity they've filled. Treat a queue with gc=0
+ // equivalent to a queue running at capacity
+ double r1 = (0 == t1.capacity)? 1.0f:
+ (double)t1.numRunningTasks/(double)t1.capacity;
+ double r2 = (0 == t2.capacity)? 1.0f:
+ (double)t2.numRunningTasks/(double)t2.capacity;
+ if (r1<r2) return -1;
+ else if (r1>r2) return 1;
+ else return 0;
}
}
// subclass for map and reduce comparators
@@ -454,197 +347,19 @@
Collections.sort(qsiForAssigningTasks, queueComparator);
}
- /**
- * Periodically, we walk through our queues to do the following:
- * a. Check if a queue needs to reclaim any resources within a period
- * of time (because it's running below capacity and more tasks are
- * waiting)
- * b. Check if a queue hasn't received enough of the resources it needed
- * to be reclaimed and thus tasks need to be killed.
- * The caller is responsible for ensuring that the QSI objects and the
- * collections are up-to-date.
- *
- * Make sure that we do not make any calls to scheduler.taskTrackerManager
- * as this can result in a deadlock (see HADOOP-4977).
- */
- private synchronized void reclaimCapacity(int nextHeartbeatInterval) {
- int tasksToKill = 0;
-
- QueueSchedulingInfo lastQsi =
- qsiForAssigningTasks.get(qsiForAssigningTasks.size()-1);
- TaskSchedulingInfo lastTsi = getTSI(lastQsi);
- long currentTime = scheduler.clock.getTime();
- for (QueueSchedulingInfo qsi: qsiForAssigningTasks) {
- TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.guaranteedCapacity <= 0) {
- // no capacity, hence nothing can be reclaimed.
- continue;
- }
- // is there any resource that needs to be reclaimed?
- if ((!tsi.reclaimList.isEmpty()) &&
- (tsi.reclaimList.getFirst().whenToKill <
- currentTime + CapacityTaskScheduler.RECLAIM_CAPACITY_INTERVAL)) {
- // make a note of how many tasks to kill to claim resources
- tasksToKill += tsi.reclaimList.getFirst().currentAmount;
- // move this to expiry list
- ReclaimedResource r = tsi.reclaimList.remove();
- tsi.reclaimExpireList.add(r);
- }
- // is there any resource that needs to be expired?
- if ((!tsi.reclaimExpireList.isEmpty()) &&
- (tsi.reclaimExpireList.getFirst().whenToExpire <= currentTime)) {
- ReclaimedResource r = tsi.reclaimExpireList.remove();
- tsi.numReclaimedResources -= r.originalAmount;
- }
- // do we need to reclaim a resource later?
- // if no queue is over capacity, there's nothing to reclaim
- if (lastTsi.numRunningTasks <= lastTsi.guaranteedCapacity) {
- continue;
- }
- if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
- // usedCap is how much capacity is currently accounted for
- int usedCap = tsi.numRunningTasks + tsi.numReclaimedResources;
- // see if we have remaining capacity and if we have enough pending
- // tasks to use up remaining capacity
- if ((usedCap < tsi.guaranteedCapacity) &&
- ((tsi.numPendingTasks - tsi.numReclaimedResources)>0)) {
- // create a request for resources to be reclaimed
- int amt = Math.min((tsi.guaranteedCapacity-usedCap),
- (tsi.numPendingTasks - tsi.numReclaimedResources));
- // create a resource object that needs to be reclaimed some time
- // in the future
- long whenToKill = qsi.reclaimTime -
- (CapacityTaskScheduler.HEARTBEATS_LEFT_BEFORE_KILLING *
- nextHeartbeatInterval);
- if (whenToKill < 0) whenToKill = 0;
- tsi.reclaimList.add(new ReclaimedResource(amt,
- currentTime + qsi.reclaimTime,
- currentTime + whenToKill));
- tsi.numReclaimedResources += amt;
- LOG.debug("Queue " + qsi.queueName + " needs to reclaim " +
- amt + " resources");
- }
- }
- }
- // kill tasks to reclaim capacity
- if (0 != tasksToKill) {
- killTasks(tasksToKill);
- }
- }
-
- // kill 'tasksToKill' tasks
- private void killTasks(int tasksToKill)
- {
- /*
- * There are a number of fair ways in which one can figure out how
- * many tasks to kill from which queue, so that the total number of
- * tasks killed is equal to 'tasksToKill'.
- * Maybe the best way is to keep a global ordering of running tasks
- * and kill the ones that ran last, irrespective of what queue or
- * job they belong to.
- * What we do here is look at how many tasks is each queue running
- * over capacity, and use that as a weight to decide how many tasks
- * to kill from that queue.
- */
-
- // first, find out all queues over capacity
- int loc;
- for (loc=0; loc<qsiForAssigningTasks.size(); loc++) {
- QueueSchedulingInfo qsi = qsiForAssigningTasks.get(loc);
- if (getTSI(qsi).numRunningTasks > getTSI(qsi).guaranteedCapacity) {
- // all queues from here onwards are running over cap
- break;
- }
- }
- // if some queue needs to reclaim cap, there must be at least one queue
- // over cap. But check, just in case.
- if (loc == qsiForAssigningTasks.size()) {
- LOG.warn("In Capacity scheduler, we need to kill " + tasksToKill +
- " tasks but there is no queue over capacity.");
- return;
- }
- // calculate how many total tasks are over cap
- int tasksOverCap = 0;
- for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
- QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
- tasksOverCap +=
- (getTSI(qsi).numRunningTasks - getTSI(qsi).guaranteedCapacity);
- }
- // now kill tasks from each queue
- for (int i=loc; i<qsiForAssigningTasks.size(); i++) {
- QueueSchedulingInfo qsi = qsiForAssigningTasks.get(i);
- killTasksFromQueue(qsi, (int)Math.round(
- ((double)(getTSI(qsi).numRunningTasks -
- getTSI(qsi).guaranteedCapacity))*
- tasksToKill/(double)tasksOverCap));
- }
- }
-
- // kill 'tasksToKill' tasks from queue represented by qsi
- private void killTasksFromQueue(QueueSchedulingInfo qsi, int tasksToKill) {
- // we start killing as many tasks as possible from the jobs that started
- // last. This way, we let long-running jobs complete faster.
- int tasksKilled = 0;
- JobInProgress jobs[] = scheduler.jobQueuesManager.
- getRunningJobQueue(qsi.queueName).toArray(new JobInProgress[0]);
- for (int i=jobs.length-1; i>=0; i--) {
- if (jobs[i].getStatus().getRunState() != JobStatus.RUNNING) {
- continue;
- }
- tasksKilled += killTasksFromJob(jobs[i], tasksToKill-tasksKilled);
- if (tasksKilled >= tasksToKill) break;
- }
- }
-
- // return the TaskAttemptID of the running task, if any, that has made
- // the least progress.
- TaskAttemptID getRunningTaskWithLeastProgress(TaskInProgress tip) {
- double leastProgress = 1;
- TaskAttemptID tID = null;
- for (Iterator<TaskAttemptID> it =
- tip.getActiveTasks().keySet().iterator(); it.hasNext();) {
- TaskAttemptID taskid = it.next();
- TaskStatus status = tip.getTaskStatus(taskid);
- if (status.getRunState() == TaskStatus.State.RUNNING) {
- if (status.getProgress() < leastProgress) {
- leastProgress = status.getProgress();
- tID = taskid;
- }
- }
- }
- return tID;
- }
-
- // called when a task is allocated to queue represented by qsi.
- // update our info about reclaimed resources
- private synchronized void updateReclaimedResources(QueueSchedulingInfo qsi) {
- TaskSchedulingInfo tsi = getTSI(qsi);
- // if we needed to reclaim resources, we have reclaimed one
- if (tsi.reclaimList.isEmpty()) {
- return;
- }
- ReclaimedResource res = tsi.reclaimList.getFirst();
- res.currentAmount--;
- if (0 == res.currentAmount) {
- // move this resource to the expiry list
- ReclaimedResource r = tsi.reclaimList.remove();
- tsi.reclaimExpireList.add(r);
- }
- }
-
private synchronized void updateCollectionOfQSIs() {
Collections.sort(qsiForAssigningTasks, queueComparator);
}
private boolean isUserOverLimit(String user, QueueSchedulingInfo qsi) {
- // what is our current capacity? It's GC if we're running below GC.
- // If we're running over GC, then its #running plus 1 (which is the
+ // what is our current capacity? It's capacity if we're running below capacity.
+ // If we're running over capacity, then its #running plus 1 (which is the
// extra slot we're getting).
int currentCapacity;
TaskSchedulingInfo tsi = getTSI(qsi);
- if (tsi.numRunningTasks < tsi.guaranteedCapacity) {
- currentCapacity = tsi.guaranteedCapacity;
+ if (tsi.numRunningTasks < tsi.capacity) {
+ currentCapacity = tsi.capacity;
}
else {
currentCapacity = tsi.numRunningTasks+1;
@@ -760,7 +475,7 @@
for (QueueSchedulingInfo qsi : qsiForAssigningTasks) {
// we may have queues with gc=0. We shouldn't look at jobs from
// these queues
- if (0 == getTSI(qsi).guaranteedCapacity) {
+ if (0 == getTSI(qsi).capacity) {
continue;
}
TaskLookupResult tlr = getTaskFromQueue(taskTracker, qsi);
@@ -772,8 +487,6 @@
// if we find a task, return
if (lookUpStatus == TaskLookupResult.LookUpStatus.TASK_FOUND) {
- // we have a task. Update reclaimed resource info
- updateReclaimedResources(qsi);
return tlr;
}
// if there was a memory mismatch, return
@@ -795,8 +508,8 @@
Collection<JobInProgress> runJobs =
scheduler.jobQueuesManager.getRunningJobQueue(qsi.queueName);
s.append(" Queue '" + qsi.queueName + "'(" + this.type + "): run=" +
- tsi.numRunningTasks + ", gc=" + tsi.guaranteedCapacity +
- ", wait=" + tsi.numPendingTasks + ", run jobs="+ runJobs.size() +
+ tsi.numRunningTasks + ", gc=" + tsi.capacity
+ + ", run jobs="+ runJobs.size() +
"*** ");
}
LOG.debug(s);
@@ -830,55 +543,7 @@
int getPendingTasks(JobInProgress job) {
return job.pendingMaps();
}
- int killTasksFromJob(JobInProgress job, int tasksToKill) {
- /*
- * We'd like to kill tasks that ran the last, or that have made the
- * least progress.
- * Ideally, each job would have a list of tasks, sorted by start
- * time or progress. That's a lot of state to keep, however.
- * For now, we do something a little different. We first try and kill
- * non-local tasks, as these can be run anywhere. For each TIP, we
- * kill the task that has made the least progress, if the TIP has
- * more than one active task.
- * We then look at tasks in runningMapCache.
- */
- int tasksKilled = 0;
-
- /*
- * For non-local running maps, we 'cheat' a bit. We know that the set
- * of non-local running maps has an insertion order such that tasks
- * that ran last are at the end. So we iterate through the set in
- * reverse. This is OK because even if the implementation changes,
- * we're still using generic set iteration and are no worse of.
- */
- TaskInProgress[] tips =
- job.getNonLocalRunningMaps().toArray(new TaskInProgress[0]);
- for (int i=tips.length-1; i>=0; i--) {
- // pick the tast attempt that has progressed least
- TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
- if (null != tid) {
- if (tips[i].killTask(tid, false)) {
- if (++tasksKilled >= tasksToKill) {
- return tasksKilled;
- }
- }
- }
- }
- // now look at other running tasks
- for (Set<TaskInProgress> s: job.getRunningMapCache().values()) {
- for (TaskInProgress tip: s) {
- TaskAttemptID tid = getRunningTaskWithLeastProgress(tip);
- if (null != tid) {
- if (tip.killTask(tid, false)) {
- if (++tasksKilled >= tasksToKill) {
- return tasksKilled;
- }
- }
- }
- }
- }
- return tasksKilled;
- }
+
TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
return qsi.mapTSI;
}
@@ -911,30 +576,7 @@
int getPendingTasks(JobInProgress job) {
return job.pendingReduces();
}
- int killTasksFromJob(JobInProgress job, int tasksToKill) {
- /*
- * For reduces, we 'cheat' a bit. We know that the set
- * of running reduces has an insertion order such that tasks
- * that ran last are at the end. So we iterate through the set in
- * reverse. This is OK because even if the implementation changes,
- * we're still using generic set iteration and are no worse of.
- */
- int tasksKilled = 0;
- TaskInProgress[] tips =
- job.getRunningReduces().toArray(new TaskInProgress[0]);
- for (int i=tips.length-1; i>=0; i--) {
- // pick the tast attempt that has progressed least
- TaskAttemptID tid = getRunningTaskWithLeastProgress(tips[i]);
- if (null != tid) {
- if (tips[i].killTask(tid, false)) {
- if (++tasksKilled >= tasksToKill) {
- return tasksKilled;
- }
- }
- }
- }
- return tasksKilled;
- }
+
TaskSchedulingInfo getTSI(QueueSchedulingInfo qsi) {
return qsi.reduceTSI;
}
@@ -953,12 +595,6 @@
/** name of the default queue. */
static final String DEFAULT_QUEUE_NAME = "default";
- /** how often does redistribution thread run (in msecs)*/
- private static long RECLAIM_CAPACITY_INTERVAL;
- /** we start killing tasks to reclaim capacity when we have so many
- * heartbeats left. */
- private static final int HEARTBEATS_LEFT_BEFORE_KILLING = 3;
-
static final Log LOG = LogFactory.getLog(CapacityTaskScheduler.class);
protected JobQueuesManager jobQueuesManager;
protected CapacitySchedulerConf schedConf;
@@ -966,33 +602,6 @@
private boolean started = false;
/**
- * Used to distribute/reclaim excess capacity among queues
- */
- class ReclaimCapacity implements Runnable {
- public ReclaimCapacity() {
- }
- public void run() {
- while (true) {
- try {
- Thread.sleep(RECLAIM_CAPACITY_INTERVAL);
- if (stopReclaim) {
- break;
- }
- reclaimCapacity();
- } catch (InterruptedException t) {
- break;
- } catch (Throwable t) {
- LOG.error("Error in redistributing capacity:\n" +
- StringUtils.stringifyException(t));
- }
- }
- }
- }
- private Thread reclaimCapacityThread = null;
- /** variable to indicate that thread should stop */
- private boolean stopReclaim = false;
-
- /**
* A clock class - can be mocked out for testing.
*/
static class Clock {
@@ -1067,9 +676,6 @@
initializeMemoryRelatedConf();
- RECLAIM_CAPACITY_INTERVAL = schedConf.getReclaimCapacityInterval();
- RECLAIM_CAPACITY_INTERVAL *= 1000;
-
// read queue info from config file
QueueManager queueManager = taskTrackerManager.getQueueManager();
Set<String> queues = queueManager.getQueues();
@@ -1078,20 +684,19 @@
throw new IllegalStateException("System has no queue configured");
}
- Set<String> queuesWithoutConfiguredGC = new HashSet<String>();
+ Set<String> queuesWithoutConfiguredCapacity = new HashSet<String>();
float totalCapacity = 0.0f;
for (String queueName: queues) {
- float gc = schedConf.getGuaranteedCapacity(queueName);
+ float gc = schedConf.getCapacity(queueName);
if(gc == -1.0) {
- queuesWithoutConfiguredGC.add(queueName);
+ queuesWithoutConfiguredCapacity.add(queueName);
}else {
totalCapacity += gc;
}
int ulMin = schedConf.getMinimumUserLimitPercent(queueName);
- long reclaimTimeLimit = schedConf.getReclaimTimeLimit(queueName) * 1000;
// create our QSI and add to our hashmap
QueueSchedulingInfo qsi = new QueueSchedulingInfo(queueName, gc,
- ulMin, reclaimTimeLimit, jobQueuesManager);
+ ulMin, jobQueuesManager);
queueInfoMap.put(queueName, qsi);
// create the queues of job objects
@@ -1105,11 +710,11 @@
}
float remainingQuantityToAllocate = 100 - totalCapacity;
float quantityToAllocate =
- remainingQuantityToAllocate/queuesWithoutConfiguredGC.size();
- for(String queue: queuesWithoutConfiguredGC) {
+ remainingQuantityToAllocate/queuesWithoutConfiguredCapacity.size();
+ for(String queue: queuesWithoutConfiguredCapacity) {
QueueSchedulingInfo qsi = queueInfoMap.get(queue);
- qsi.guaranteedCapacityPercent = quantityToAllocate;
- schedConf.setGuaranteedCapacity(queue, quantityToAllocate);
+ qsi.capacityPercent = quantityToAllocate;
+ schedConf.setCapacity(queue, quantityToAllocate);
}
// check if there's a queue with the default name. If not, we quit.
@@ -1137,19 +742,9 @@
initializationPoller.setDaemon(true);
initializationPoller.start();
- // start thread for redistributing capacity if we have more than
- // one queue
- if (queueInfoMap.size() > 1) {
- this.reclaimCapacityThread =
- new Thread(new ReclaimCapacity(),"reclaimCapacity");
- this.reclaimCapacityThread.start();
- }
- else {
- LOG.info("Only one queue present. Reclaim capacity thread not started.");
- }
-
started = true;
- LOG.info("Capacity scheduler initialized " + queues.size() + " queues"); }
+ LOG.info("Capacity scheduler initialized " + queues.size() + " queues");
+ }
/** mostly for testing purposes */
void setInitializationPoller(JobInitializationPoller p) {
@@ -1163,8 +758,6 @@
taskTrackerManager.removeJobInProgressListener(
jobQueuesManager);
}
- // tell the reclaim thread to stop
- stopReclaim = true;
started = false;
initializationPoller.terminate();
super.terminate();
@@ -1176,27 +769,6 @@
}
/**
- * Reclaim capacity for both map & reduce tasks.
- * Do not make this synchronized, since we call taskTrackerManager
- * (see HADOOP-4977).
- */
- void reclaimCapacity() {
- // get the cluster capacity
- ClusterStatus c = taskTrackerManager.getClusterStatus();
- int mapClusterCapacity = c.getMaxMapTasks();
- int reduceClusterCapacity = c.getMaxReduceTasks();
- int nextHeartbeatInterval = taskTrackerManager.getNextHeartbeatInterval();
- // update the QSI objects
- updateQSIObjects(mapClusterCapacity, reduceClusterCapacity);
- // update the qsi collections, since we depend on their ordering
- mapScheduler.updateCollectionOfQSIs();
- reduceScheduler.updateCollectionOfQSIs();
- // now, reclaim
- mapScheduler.reclaimCapacity(nextHeartbeatInterval);
- reduceScheduler.reclaimCapacity(nextHeartbeatInterval);
- }
-
- /**
* provided for the test classes
* lets you update the QSI objects and sorted collections
*/
@@ -1216,31 +788,27 @@
* to make scheduling decisions. For example, we don't need an exact count
* of numRunningTasks. Once we count upto the grid capacity, any
* number beyond that will make no difference.
- *
- * The pending task count is only required in reclaim capacity. So
- * if the computation becomes expensive, we can add a boolean to
- * denote if pending task computation is required or not.
- *
+ *
**/
- private synchronized void updateQSIObjects(int mapClusterCapacity,
+ private synchronized void updateQSIObjects(int mapClusterCapacity,
int reduceClusterCapacity) {
- // if # of slots have changed since last time, update.
+ // if # of slots have changed since last time, update.
// First, compute whether the total number of TT slots have changed
for (QueueSchedulingInfo qsi: queueInfoMap.values()) {
- // compute new GCs, if TT slots have changed
+ // compute new capacities, if TT slots have changed
if (mapClusterCapacity != prevMapClusterCapacity) {
- qsi.mapTSI.guaranteedCapacity =
- (int)(qsi.guaranteedCapacityPercent*mapClusterCapacity/100);
+ qsi.mapTSI.capacity =
+ (int)(qsi.capacityPercent*mapClusterCapacity/100);
}
if (reduceClusterCapacity != prevReduceClusterCapacity) {
- qsi.reduceTSI.guaranteedCapacity =
- (int)(qsi.guaranteedCapacityPercent*reduceClusterCapacity/100);
+ qsi.reduceTSI.capacity =
+ (int)(qsi.capacityPercent*reduceClusterCapacity/100);
}
// reset running/pending tasks, tasks per user
qsi.mapTSI.resetTaskVars();
qsi.reduceTSI.resetTaskVars();
// update stats on running jobs
- for (JobInProgress j:
+ for (JobInProgress j:
jobQueuesManager.getRunningJobQueue(qsi.queueName)) {
if (j.getStatus().getRunState() != JobStatus.RUNNING) {
continue;
@@ -1249,62 +817,36 @@
int runningReduces = j.runningReduces();
qsi.mapTSI.numRunningTasks += runningMaps;
qsi.reduceTSI.numRunningTasks += runningReduces;
- Integer i =
+ Integer i =
qsi.mapTSI.numRunningTasksByUser.get(j.getProfile().getUser());
- qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
+ qsi.mapTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
i+runningMaps);
i = qsi.reduceTSI.numRunningTasksByUser.get(j.getProfile().getUser());
- qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
+ qsi.reduceTSI.numRunningTasksByUser.put(j.getProfile().getUser(),
i+runningReduces);
- qsi.mapTSI.numPendingTasks += j.pendingMaps();
- qsi.reduceTSI.numPendingTasks += j.pendingReduces();
LOG.debug("updateQSI: job " + j.getJobID().toString() + ": run(m) = " +
- j.runningMaps() + ", run(r) = " + j.runningReduces() +
- ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
- j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
- ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
- j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
- + ", total(m) = " + j.numMapTasks + ", total(r) = " +
+ j.runningMaps() + ", run(r) = " + j.runningReduces() +
+ ", finished(m) = " + j.finishedMaps() + ", finished(r)= " +
+ j.finishedReduces() + ", failed(m) = " + j.failedMapTasks +
+ ", failed(r) = " + j.failedReduceTasks + ", spec(m) = " +
+ j.speculativeMapTasks + ", spec(r) = " + j.speculativeReduceTasks
+ + ", total(m) = " + j.numMapTasks + ", total(r) = " +
j.numReduceTasks);
- /*
+ /*
* it's fine walking down the entire list of running jobs - there
* probably will not be many, plus, we may need to go through the
* list to compute numRunningTasksByUser. If this is expensive, we
* can keep a list of running jobs per user. Then we only need to
* consider the first few jobs per user.
- */
- }
-
- //update stats on waiting jobs
- for(JobInProgress j: jobQueuesManager.getWaitingJobs(qsi.queueName)) {
- // pending tasks
- if ((qsi.mapTSI.numPendingTasks > mapClusterCapacity) &&
- (qsi.reduceTSI.numPendingTasks > reduceClusterCapacity)) {
- // that's plenty. no need for more computation
- break;
- }
- /*
- * Consider only the waiting jobs in the job queue. Job queue can
- * contain:
- * 1. Jobs which are in running state but not scheduled
- * (these would also be present in running queue), the pending
- * task count of these jobs is computed when scheduler walks
- * through running job queue.
- * 2. Jobs which are killed by user, but waiting job initialization
- * poller to walk through the job queue to clean up killed jobs.
*/
- if (j.getStatus().getRunState() == JobStatus.PREP) {
- qsi.mapTSI.numPendingTasks += j.pendingMaps();
- qsi.reduceTSI.numPendingTasks += j.pendingReduces();
- }
}
}
-
+
prevMapClusterCapacity = mapClusterCapacity;
prevReduceClusterCapacity = reduceClusterCapacity;
}
- /*
+ /*
* The grand plan for assigning a task.
* First, decide whether a Map or Reduce task should be given to a TT
* (if the TT can accept either).