You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/09/12 01:57:38 UTC
svn commit: r1169585 [3/5] - in
/hadoop/common/branches/branch-0.20-security: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/ src/co...
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,185 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class JobSchedulable extends Schedulable {
+ private FairScheduler scheduler;
+ private JobInProgress job;
+ private TaskType taskType;
+ private int demand = 0;
+
+ public JobSchedulable(FairScheduler scheduler, JobInProgress job,
+ TaskType taskType) {
+ this.scheduler = scheduler;
+ this.job = job;
+ this.taskType = taskType;
+
+ initMetrics();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return taskType;
+ }
+
+ @Override
+ public String getName() {
+ return job.getJobID().toString();
+ }
+
+ public JobInProgress getJob() {
+ return job;
+ }
+
+ @Override
+ public void updateDemand() {
+ demand = 0;
+ if (isRunnable()) {
+ // For reduces, make sure enough maps are done that reduces can launch
+ if (taskType == TaskType.REDUCE && !job.scheduleReduces())
+ return;
+ // Add up demand from each TaskInProgress; each TIP can either
+ // - have no attempts running, in which case it demands 1 slot
+ // - have N attempts running, in which case it demands N slots, and may
+ // potentially demand one more slot if it needs to be speculated
+ TaskInProgress[] tips = (taskType == TaskType.MAP ?
+ job.getTasks(TaskType.MAP) : job.getTasks(TaskType.REDUCE));
+ boolean speculationEnabled = (taskType == TaskType.MAP ?
+ job.getMapSpeculativeExecution() : job.getReduceSpeculativeExecution());
+ double avgProgress = (taskType == TaskType.MAP ?
+ job.getStatus().mapProgress() : job.getStatus().reduceProgress());
+ long time = scheduler.getClock().getTime();
+ for (TaskInProgress tip: tips) {
+ if (!tip.isComplete()) {
+ if (tip.isRunning()) {
+ // Count active tasks and any speculative task we want to launch
+ demand += tip.getActiveTasks().size();
+ if (speculationEnabled && tip.hasSpeculativeTask(time, avgProgress))
+ demand += 1;
+ } else {
+ // Need to launch 1 task
+ demand += 1;
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isRunnable() {
+ JobInfo info = scheduler.getJobInfo(job);
+ int runState = job.getStatus().getRunState();
+ return (info != null && info.runnable && runState == JobStatus.RUNNING);
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public void redistributeShare() {}
+
+ @Override
+ public JobPriority getPriority() {
+ return job.getPriority();
+ }
+
+ @Override
+ public int getRunningTasks() {
+ if (!job.inited()) {
+ return 0;
+ }
+ return taskType == TaskType.MAP ? job.runningMaps() : job.runningReduces();
+ }
+
+ @Override
+ public long getStartTime() {
+ return job.startTime;
+ }
+
+ @Override
+ public double getWeight() {
+ return scheduler.getJobWeight(job, taskType);
+ }
+
+ @Override
+ public int getMinShare() {
+ return 0;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ if (isRunnable()) {
+ visited.add(job);
+ TaskTrackerManager ttm = scheduler.taskTrackerManager;
+ ClusterStatus clusterStatus = ttm.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+
+ // check with the load manager whether it is safe to
+ // launch this task on this taskTracker.
+ LoadManager loadMgr = scheduler.getLoadManager();
+ if (!loadMgr.canLaunchTask(tts, job, taskType)) {
+ return null;
+ }
+ if (taskType == TaskType.MAP) {
+ LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
+ job, currentTime);
+ scheduler.getEventLog().log(
+ "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
+ switch (localityLevel) {
+ case NODE:
+ return job.obtainNewNodeLocalMapTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts());
+ case RACK:
+ return job.obtainNewNodeOrRackLocalMapTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts());
+ default:
+ return job.obtainNewMapTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts());
+ }
+ } else {
+ return job.obtainNewReduceTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts());
+ }
+ } else {
+ return null;
+ }
+ }
+
+
+ @Override
+ protected String getMetricsContextName() {
+ return "jobs";
+ }
+
+ @Override
+ void updateMetrics() {
+ assert metrics != null;
+
+ super.setMetricValues(metrics);
+ metrics.update();
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Sun Sep 11 23:57:37 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* A pluggable object that manages the load on each {@link TaskTracker}, telling
@@ -30,6 +31,7 @@ import org.apache.hadoop.conf.Configurat
public abstract class LoadManager implements Configurable {
protected Configuration conf;
protected TaskTrackerManager taskTrackerManager;
+ protected FairSchedulerEventLog schedulingLog;
public Configuration getConf() {
return conf;
@@ -43,6 +45,10 @@ public abstract class LoadManager implem
TaskTrackerManager taskTrackerManager) {
this.taskTrackerManager = taskTrackerManager;
}
+
+ public void setEventLog(FairSchedulerEventLog schedulingLog) {
+ this.schedulingLog = schedulingLog;
+ }
/**
* Lifecycle method to allow the LoadManager to start any work in separate
@@ -61,6 +67,8 @@ public abstract class LoadManager implem
/**
* Can a given {@link TaskTracker} run another map task?
+ * This method may check whether the specified tracker has
+ * enough resources to run another map task.
* @param tracker The machine we wish to run a new map on
* @param totalRunnableMaps Set of running jobs in the cluster
* @param totalMapSlots The total number of map slots in the cluster
@@ -71,6 +79,8 @@ public abstract class LoadManager implem
/**
* Can a given {@link TaskTracker} run another reduce task?
+ * This method may check whether the specified tracker has
+ * enough resources to run another reduce task.
* @param tracker The machine we wish to run a new map on
* @param totalRunnableReduces Set of running jobs in the cluster
* @param totalReduceSlots The total number of reduce slots in the cluster
@@ -78,4 +88,16 @@ public abstract class LoadManager implem
*/
public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
int totalRunnableReduces, int totalReduceSlots);
+
+ /**
+ * Can a given {@link TaskTracker} run another new task from a given job?
+ * This method is provided for use by LoadManagers that take into
+ * account jobs' individual resource needs when placing tasks.
+ * @param tracker The machine we wish to run a new map on
+ * @param job The job from which we want to run a task on this machine
+ * @param type The type of task that we want to run on
+ * @return true if this task can be launched on <code>tracker</code>
+ */
+ public abstract boolean canLaunchTask(TaskTrackerStatus tracker,
+ JobInProgress job, TaskType type);
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Represents the level of data-locality at which a job in the fair scheduler
+ * is allowed to launch tasks. By default, jobs are not allowed to launch
+ * non-data-local tasks until they have waited a small number of seconds to
+ * find a slot on a node that they have data on. If a job has waited this
+ * long, it is allowed to launch rack-local tasks as well (on nodes that may
+ * not have the task's input data, but share a rack with a node that does).
+ * Finally, after a further wait, jobs are allowed to launch tasks anywhere
+ * in the cluster.
+ *
+ * This enum defines three levels - NODE, RACK and ANY (for allowing tasks
+ * to be launched on any node). A map task's level can be obtained from
+ * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In
+ * addition, for any locality level, it is possible to get a "level cap" to pass
+ * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks at this level or lower are launched, through
+ * the {@link #toCacheLevelCap()} method.
+ */
+public enum LocalityLevel {
+ NODE, RACK, ANY;
+
+ public static LocalityLevel fromTask(JobInProgress job, Task mapTask,
+ TaskTrackerStatus tracker) {
+ TaskID tipID = mapTask.getTaskID().getTaskID();
+ TaskInProgress tip = job.getTaskInProgress(tipID);
+ switch (job.getLocalityLevel(tip, tracker)) {
+ case 0: return LocalityLevel.NODE;
+ case 1: return LocalityLevel.RACK;
+ default: return LocalityLevel.ANY;
+ }
+ }
+
+ /**
+ * Obtain a JobInProgress cache level cap to pass to
+ * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks of this locality level and lower are launched.
+ */
+ public int toCacheLevelCap() {
+ switch(this) {
+ case NODE: return 1;
+ case RACK: return 2;
+ default: return Integer.MAX_VALUE;
+ }
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java Sun Sep 11 23:57:37 2011
@@ -20,6 +20,7 @@ package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Sun Sep 11 23:57:37 2011
@@ -21,6 +21,9 @@ package org.apache.hadoop.mapred;
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+
/**
* A schedulable pool of jobs.
*/
@@ -33,9 +36,17 @@ public class Pool {
/** Jobs in this specific pool; does not include children pools' jobs. */
private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+ /** Scheduling mode for jobs inside the pool (fair or FIFO) */
+ private SchedulingMode schedulingMode;
- public Pool(String name) {
+ private PoolSchedulable mapSchedulable;
+ private PoolSchedulable reduceSchedulable;
+
+ public Pool(FairScheduler scheduler, String name) {
this.name = name;
+ mapSchedulable = new PoolSchedulable(scheduler, this, TaskType.MAP);
+ reduceSchedulable = new PoolSchedulable(scheduler, this, TaskType.REDUCE);
}
public Collection<JobInProgress> getJobs() {
@@ -44,17 +55,46 @@ public class Pool {
public void addJob(JobInProgress job) {
jobs.add(job);
+ mapSchedulable.addJob(job);
+ reduceSchedulable.addJob(job);
}
public void removeJob(JobInProgress job) {
jobs.remove(job);
+ mapSchedulable.removeJob(job);
+ reduceSchedulable.removeJob(job);
}
public String getName() {
return name;
}
+ public SchedulingMode getSchedulingMode() {
+ return schedulingMode;
+ }
+
+ public void setSchedulingMode(SchedulingMode schedulingMode) {
+ this.schedulingMode = schedulingMode;
+ }
+
public boolean isDefaultPool() {
return Pool.DEFAULT_POOL_NAME.equals(name);
}
+
+ public PoolSchedulable getMapSchedulable() {
+ return mapSchedulable;
+ }
+
+ public PoolSchedulable getReduceSchedulable() {
+ return reduceSchedulable;
+ }
+
+ public PoolSchedulable getSchedulable(TaskType type) {
+ return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
+ }
+
+ public void updateMetrics() {
+ mapSchedulable.updateMetrics();
+ reduceSchedulable.updateMetrics();
+ }
}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Sun Sep 11 23:57:37 2011
@@ -20,6 +20,8 @@ package org.apache.hadoop.mapred;
import java.io.File;
import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
@@ -34,6 +36,8 @@ import javax.xml.parsers.ParserConfigura
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -42,7 +46,8 @@ import org.w3c.dom.Text;
import org.xml.sax.SAXException;
/**
- * Maintains a hierarchy of pools.
+ * Maintains a list of pools as well as scheduling parameters for each pool,
+ * such as guaranteed share allocations, from the fair scheduler config file.
*/
public class PoolManager {
public static final Log LOG = LogFactory.getLog(
@@ -56,11 +61,19 @@ public class PoolManager {
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+
+ public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
+
+ private final FairScheduler scheduler;
// Map and reduce minimum allocations for each pool
private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+ // If set, cap number of map and reduce tasks in a pool
+ private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+ private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
+
// Sharing weights for each pool
private Map<String, Double> poolWeights = new HashMap<String, Double>();
@@ -69,10 +82,31 @@ public class PoolManager {
private Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
private Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
private int userMaxJobsDefault = Integer.MAX_VALUE;
+ private int poolMaxJobsDefault = Integer.MAX_VALUE;
- private String allocFile; // Path to XML file containing allocations
+ // Min share preemption timeout for each pool in seconds. If a job in the pool
+ // waits this long without receiving its guaranteed share, it is allowed to
+ // preempt other jobs' tasks.
+ private Map<String, Long> minSharePreemptionTimeouts =
+ new HashMap<String, Long>();
+
+ // Default min share preemption timeout for pools where it is not set
+ // explicitly.
+ private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
+ // below half its fair share for this long, it is allowed to preempt tasks.
+ private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
+ private Object allocFile; // Path to XML file containing allocations. This
+ // is either a URL to specify a classpath resource
+ // (if the fair-scheduler.xml on the classpath is
+ // used) or a String to specify an absolute path (if
+ // mapred.fairscheduler.allocation.file is used).
private String poolNameProperty; // Jobconf property to use for determining a
- // job's pool name (default: mapred.job.queue.name)
+ // job's pool name (default: user.name)
private Map<String, Pool> pools = new HashMap<String, Pool>();
@@ -80,14 +114,25 @@ public class PoolManager {
private long lastSuccessfulReload; // Last time we successfully reloaded pools
private boolean lastReloadAttemptFailed = false;
- public PoolManager(Configuration conf) throws IOException, SAXException,
+ public PoolManager(FairScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
+ Configuration conf = scheduler.getConf();
this.poolNameProperty = conf.get(
"mapred.fairscheduler.poolnameproperty", "user.name");
this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
if (allocFile == null) {
- LOG.warn("No mapred.fairscheduler.allocation.file given in jobconf - " +
- "the fair scheduler will not use any queues.");
+ // No allocation file specified in jobconf. Use the default allocation
+ // file, fair-scheduler.xml, looking for it on the classpath.
+ allocFile = new Configuration().getResource("fair-scheduler.xml");
+ if (allocFile == null) {
+ LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
+ + "not found on the classpath, and no other config file is given "
+ + "through mapred.fairscheduler.allocation.file.");
+ }
}
reloadAllocs();
lastSuccessfulReload = System.currentTimeMillis();
@@ -102,11 +147,19 @@ public class PoolManager {
public synchronized Pool getPool(String name) {
Pool pool = pools.get(name);
if (pool == null) {
- pool = new Pool(name);
+ pool = new Pool(scheduler, name);
+ pool.setSchedulingMode(defaultSchedulingMode);
pools.put(name, pool);
}
return pool;
}
+
+ /**
+ * Get the pool that a given job is in.
+ */
+ public Pool getPool(JobInProgress job) {
+ return getPool(getPoolName(job));
+ }
/**
* Reload allocations file if it hasn't been loaded in a while
@@ -115,9 +168,20 @@ public class PoolManager {
long time = System.currentTimeMillis();
if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
lastReloadAttempt = time;
+ if (null == allocFile) {
+ return;
+ }
try {
- File file = new File(allocFile);
- long lastModified = file.lastModified();
+ // Get last modified time of alloc file depending whether it's a String
+ // (for a path name) or an URL (for a classloader resource)
+ long lastModified;
+ if (allocFile instanceof String) {
+ File file = new File((String) allocFile);
+ lastModified = file.lastModified();
+ } else { // allocFile is an URL
+ URLConnection conn = ((URL) allocFile).openConnection();
+ lastModified = conn.getLastModified();
+ }
if (lastModified > lastSuccessfulReload &&
time > lastModified + ALLOC_RELOAD_WAIT) {
reloadAllocs();
@@ -131,7 +195,7 @@ public class PoolManager {
// We log the error only on the first failure so we don't fill up the
// JobTracker's log with these messages.
if (!lastReloadAttemptFailed) {
- LOG.error("Failed to reload allocations file - " +
+ LOG.error("Failed to reload fair scheduler config file - " +
"will use existing allocations.", e);
}
lastReloadAttemptFailed = true;
@@ -165,8 +229,16 @@ public class PoolManager {
Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+ Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+ Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
Map<String, Double> poolWeights = new HashMap<String, Double>();
+ Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
+ Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
int userMaxJobsDefault = Integer.MAX_VALUE;
+ int poolMaxJobsDefault = Integer.MAX_VALUE;
+ long fairSharePreemptionTimeout = Long.MAX_VALUE;
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all pool names so we can display them on web UI, etc.
List<String> poolNamesInAllocFile = new ArrayList<String>();
@@ -176,11 +248,16 @@ public class PoolManager {
DocumentBuilderFactory.newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc = builder.parse(new File(allocFile));
+ Document doc;
+ if (allocFile instanceof String) {
+ doc = builder.parse(new File((String) allocFile));
+ } else {
+ doc = builder.parse(allocFile.toString());
+ }
Element root = doc.getDocumentElement();
if (!"allocations".equals(root.getTagName()))
- throw new AllocationConfigurationException("Bad allocations file: " +
- "top-level element not <allocations>");
+ throw new AllocationConfigurationException("Bad fair scheduler config " +
+ "file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
@@ -204,6 +281,14 @@ public class PoolManager {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
reduceAllocs.put(poolName, val);
+ } else if ("maxMaps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ poolMaxMaps.put(poolName, val);
+ } else if ("maxReduces".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ poolMaxReduces.put(poolName, val);
} else if ("maxRunningJobs".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@@ -212,8 +297,25 @@ public class PoolManager {
String text = ((Text)field.getFirstChild()).getData().trim();
double val = Double.parseDouble(text);
poolWeights.put(poolName, val);
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(poolName, val);
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ poolModes.put(poolName, parseSchedulingMode(text));
}
}
+ if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName)
+ && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) {
+ LOG.warn(String.format("Pool %s has max maps %d less than min maps %d",
+ poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName)));
+ }
+ if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName)
+ && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) {
+ LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d",
+ poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName)));
+ }
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@@ -232,6 +334,21 @@ public class PoolManager {
String text = ((Text)element.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
userMaxJobsDefault = val;
+ } else if ("poolMaxJobsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ poolMaxJobsDefault = val;
+ } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ fairSharePreemptionTimeout = val;
+ } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultMinSharePreemptionTimeout = val;
+ } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ defaultSchedulingMode = parseSchedulingMode(text);
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -242,17 +359,61 @@ public class PoolManager {
synchronized(this) {
this.mapAllocs = mapAllocs;
this.reduceAllocs = reduceAllocs;
+ this.poolMaxMaps = poolMaxMaps;
+ this.poolMaxReduces = poolMaxReduces;
this.poolMaxJobs = poolMaxJobs;
this.userMaxJobs = userMaxJobs;
- this.userMaxJobsDefault = userMaxJobsDefault;
this.poolWeights = poolWeights;
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+ this.userMaxJobsDefault = userMaxJobsDefault;
+ this.poolMaxJobsDefault = poolMaxJobsDefault;
+ this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+ this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ this.defaultSchedulingMode = defaultSchedulingMode;
for (String name: poolNamesInAllocFile) {
- getPool(name);
+ Pool pool = getPool(name);
+ if (poolModes.containsKey(name)) {
+ pool.setSchedulingMode(poolModes.get(name));
+ } else {
+ pool.setSchedulingMode(defaultSchedulingMode);
+ }
}
}
}
/**
+ * Does the pool have incompatible max and min allocation set.
+ *
+ * @param type
+ * {@link TaskType#MAP} or {@link TaskType#REDUCE}
+ * @param pool
+ * the pool name
+ * @return true if the max is less than the min
+ */
+ boolean invertedMinMax(TaskType type, String pool) {
+ Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces;
+ Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs;
+ if (max.containsKey(pool) && min.containsKey(pool)
+ && max.get(pool) < min.get(pool)) {
+ return true;
+ }
+ return false;
+ }
+
+ private SchedulingMode parseSchedulingMode(String text)
+ throws AllocationConfigurationException {
+ text = text.toLowerCase();
+ if (text.equals("fair")) {
+ return SchedulingMode.FAIR;
+ } else if (text.equals("fifo")) {
+ return SchedulingMode.FIFO;
+ } else {
+ throw new AllocationConfigurationException(
+ "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+ }
+ }
+
+ /**
* Get the allocation for a particular pool
*/
public int getAllocation(String pool, TaskType taskType) {
@@ -261,7 +422,20 @@ public class PoolManager {
Integer alloc = allocationMap.get(pool);
return (alloc == null ? 0 : alloc);
}
-
+
+ /**
+ * Get the maximum map or reduce slots for the given pool.
+ * @return the cap set on this pool, or Integer.MAX_VALUE if not set.
+ */
+ int getMaxSlots(String poolName, TaskType taskType) {
+ Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces);
+ if (maxMap.containsKey(poolName)) {
+ return maxMap.get(poolName);
+ } else {
+ return Integer.MAX_VALUE;
+ }
+ }
+
/**
* Add a job in the appropriate pool
*/
@@ -281,7 +455,7 @@ public class PoolManager {
*/
public synchronized void setPool(JobInProgress job, String pool) {
removeJob(job);
- job.getJobConf().set(poolNameProperty, pool);
+ job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
addJob(job);
}
@@ -293,13 +467,16 @@ public class PoolManager {
}
/**
- * Get the pool name for a JobInProgress from its configuration. This uses
- * the "project" property in the jobconf by default, or the property set with
- * "mapred.fairscheduler.poolnameproperty".
+ * Get the pool name for a JobInProgress from its configuration. This uses
+ * the value of mapred.fairscheduler.pool if specified, otherwise the value
+ * of the property named in mapred.fairscheduler.poolnameproperty if that is
+ * specified. Otherwise if neither is specified it uses the "user.name" property
+ * in the jobconf by default.
*/
public String getPoolName(JobInProgress job) {
- JobConf conf = job.getJobConf();
- return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+ Configuration conf = job.getJobConf();
+ return conf.get(EXPLICIT_POOL_PROPERTY,
+ conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
}
/**
@@ -327,7 +504,7 @@ public class PoolManager {
if (poolMaxJobs.containsKey(pool)) {
return poolMaxJobs.get(pool);
} else {
- return Integer.MAX_VALUE;
+ return poolMaxJobsDefault;
}
}
@@ -338,4 +515,32 @@ public class PoolManager {
return 1.0;
}
}
+
+ /**
+ * Get a pool's min share preemption timeout, in milliseconds. This is the
+ * time after which jobs in the pool may kill other pools' tasks if they
+ * are below their min share.
+ */
+ public long getMinSharePreemptionTimeout(String pool) {
+ if (minSharePreemptionTimeouts.containsKey(pool)) {
+ return minSharePreemptionTimeouts.get(pool);
+ } else {
+ return defaultMinSharePreemptionTimeout;
+ }
+ }
+
+ /**
+ * Get the fair share preemption, in milliseconds. This is the time
+ * after which any job may kill other jobs' tasks if it is below half
+ * its fair share.
+ */
+ public long getFairSharePreemptionTimeout() {
+ return fairSharePreemptionTimeout;
+ }
+
+ synchronized void updateMetrics() {
+ for (Pool pool : pools.values()) {
+ pool.updateMetrics();
+ }
+ }
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,221 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class PoolSchedulable extends Schedulable {
+ public static final Log LOG = LogFactory.getLog(
+ PoolSchedulable.class.getName());
+
+ private FairScheduler scheduler;
+ private Pool pool;
+ private TaskType taskType;
+ private PoolManager poolMgr;
+ private List<JobSchedulable> jobScheds = new LinkedList<JobSchedulable>();
+ private int demand = 0;
+
+ // Variables used for preemption
+ long lastTimeAtMinShare;
+ long lastTimeAtHalfFairShare;
+
+ public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) {
+ this.scheduler = scheduler;
+ this.pool = pool;
+ this.taskType = type;
+ this.poolMgr = scheduler.getPoolManager();
+ long currentTime = scheduler.getClock().getTime();
+ this.lastTimeAtMinShare = currentTime;
+ this.lastTimeAtHalfFairShare = currentTime;
+
+ initMetrics();
+ }
+
+ public void addJob(JobInProgress job) {
+ JobInfo info = scheduler.getJobInfo(job);
+ jobScheds.add(taskType == TaskType.MAP ?
+ info.mapSchedulable : info.reduceSchedulable);
+ }
+
+ public void removeJob(JobInProgress job) {
+ for (Iterator<JobSchedulable> it = jobScheds.iterator(); it.hasNext();) {
+ JobSchedulable jobSched = it.next();
+ if (jobSched.getJob() == job) {
+ it.remove();
+ break;
+ }
+ }
+ }
+
+ /**
+ * Update demand by asking jobs in the pool to update
+ */
+ @Override
+ public void updateDemand() {
+ demand = 0;
+ for (JobSchedulable sched: jobScheds) {
+ sched.updateDemand();
+ demand += sched.getDemand();
+ }
+ // if demand exceeds the cap for this pool, limit to the max
+ int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
+ if(demand > maxTasks) {
+ demand = maxTasks;
+ }
+ }
+
+ /**
+ * Distribute the pool's fair share among its jobs
+ */
+ @Override
+ public void redistributeShare() {
+ if (pool.getSchedulingMode() == SchedulingMode.FAIR) {
+ SchedulingAlgorithms.computeFairShares(jobScheds, getFairShare());
+ } else {
+ for (JobSchedulable sched: jobScheds) {
+ sched.setFairShare(0);
+ }
+ }
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public int getMinShare() {
+ return poolMgr.getAllocation(pool.getName(), taskType);
+ }
+
+ @Override
+ public double getWeight() {
+ return poolMgr.getPoolWeight(pool.getName());
+ }
+
+ @Override
+ public JobPriority getPriority() {
+ return JobPriority.NORMAL;
+ }
+
+ @Override
+ public int getRunningTasks() {
+ int ans = 0;
+ for (JobSchedulable sched: jobScheds) {
+ ans += sched.getRunningTasks();
+ }
+ return ans;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ int runningTasks = getRunningTasks();
+ if (runningTasks >= poolMgr.getMaxSlots(pool.getName(), taskType)) {
+ return null;
+ }
+ SchedulingMode mode = pool.getSchedulingMode();
+ Comparator<Schedulable> comparator;
+ if (mode == SchedulingMode.FIFO) {
+ comparator = new SchedulingAlgorithms.FifoComparator();
+ } else if (mode == SchedulingMode.FAIR) {
+ comparator = new SchedulingAlgorithms.FairShareComparator();
+ } else {
+ throw new RuntimeException("Unsupported pool scheduling mode " + mode);
+ }
+ Collections.sort(jobScheds, comparator);
+ for (JobSchedulable sched: jobScheds) {
+ Task task = sched.assignTask(tts, currentTime, visited);
+ if (task != null)
+ return task;
+ }
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return pool.getName();
+ }
+
+ Pool getPool() {
+ return pool;
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return taskType;
+ }
+
+ public Collection<JobSchedulable> getJobSchedulables() {
+ return jobScheds;
+ }
+
+ public long getLastTimeAtMinShare() {
+ return lastTimeAtMinShare;
+ }
+
+ public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+ this.lastTimeAtMinShare = lastTimeAtMinShare;
+ }
+
+ public long getLastTimeAtHalfFairShare() {
+ return lastTimeAtHalfFairShare;
+ }
+
+ public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
+ this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+ }
+
+ protected String getMetricsContextName() {
+ return "pools";
+ }
+
+ @Override
+ public void updateMetrics() {
+ super.setMetricValues(metrics);
+
+ if (scheduler.isPreemptionEnabled()) {
+ // These won't be set if preemption is off
+ long lastCheck = scheduler.getLastPreemptionUpdateTime();
+ metrics.setMetric("millisSinceAtMinShare", lastCheck - lastTimeAtMinShare);
+ metrics.setMetric("millisSinceAtHalfFairShare", lastCheck - lastTimeAtHalfFairShare);
+ }
+ metrics.update();
+
+ for (JobSchedulable job : jobScheds) {
+ job.updateMetrics();
+ }
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,171 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a pool. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a pool and across pools. There are
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and PoolSchedulables, which allocate among jobs in their pool.
+ *
+ * Separate sets of Schedulables are used for maps and reduces. Each pool has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ *
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/pool to the scheduler, including:
+ * - Demand (maximum number of tasks required)
+ * - Number of currently running tasks
+ * - Minimum share (for pools)
+ * - Job/pool weight (for fair sharing)
+ * - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ *
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ * jobs and pools, which may be expensive (e.g. jobs must iterate through all
+ * their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ * fair share has been set by its parent to let it distribute its share among
+ * the other Schedulables within it (e.g. for pools that want to perform fair
+ * sharing among their jobs).
+ */
+abstract class Schedulable {
+ /** Fair share assigned to this Schedulable */
+ private double fairShare = 0;
+ protected MetricsRecord metrics;
+
+ /**
+ * Name of job/pool, used for debugging as well as for breaking ties in
+ * scheduling order deterministically.
+ */
+ public abstract String getName();
+
+ /**
+ * @return the type of tasks that this pool schedules
+ */
+ public abstract TaskType getTaskType();
+
+ /**
+ * Maximum number of tasks required by this Schedulable. This is defined as
+ * number of currently running tasks + number of unlaunched tasks (tasks that
+ * are either not yet launched or need to be speculated).
+ */
+ public abstract int getDemand();
+
+ /** Number of tasks the schedulable is currently running. */
+ public abstract int getRunningTasks();
+
+ /** Minimum share slots assigned to the schedulable. */
+ public abstract int getMinShare();
+
+ /** Job/pool weight in fair sharing. */
+ public abstract double getWeight();
+
+ /** Job priority for jobs in FIFO pools; meaningless for PoolSchedulables. */
+ public abstract JobPriority getPriority();
+
+ /** Start time for jobs in FIFO pools; meaningless for PoolSchedulables. */
+ public abstract long getStartTime();
+
+ /** Refresh the Schedulable's demand and those of its children if any. */
+ public abstract void updateDemand();
+
+ /**
+ * Distribute the fair share assigned to this Schedulable among its
+ * children (used in pools where the internal scheduler is fair sharing).
+ */
+ public abstract void redistributeShare();
+
+ /**
+ * Obtain a task for a given TaskTracker, or null if the Schedulable has
+ * no tasks to launch at this moment or does not wish to launch a task on
+ * this TaskTracker (e.g. is waiting for a TaskTracker with local data).
+ * In addition, if a job is skipped during this search because it is waiting
+ * for a TaskTracker with local data, this method is expected to add it to
+ * the <tt>visited</tt> collection passed in, so that the scheduler can
+ * properly mark it as skipped during this heartbeat. Please see
+ * {@link FairScheduler#getAllowedLocalityLevel(JobInProgress, long)}
+ * for details of delay scheduling (waiting for trackers with local data).
+ *
+ * @param tts TaskTracker that the task will be launched on
+ * @param currentTime Cached time (to prevent excessive calls to gettimeofday)
+ * @param visited A Collection to which this method must add all jobs that
+ * were considered during the search for a job to assign.
+ * @return Task to launch, or null if Schedulable cannot currently launch one.
+ * @throws IOException Possible if obtainNew(Map|Reduce)Task throws exception.
+ */
+ public abstract Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException;
+
+ /** Assign a fair share to this Schedulable. */
+ public void setFairShare(double fairShare) {
+ this.fairShare = fairShare;
+ }
+
+ /** Get the fair share assigned to this Schedulable. */
+ public double getFairShare() {
+ return fairShare;
+ }
+
+ /** Return the name of the metrics context for this schedulable */
+ protected abstract String getMetricsContextName();
+
+ /**
+ * Set up metrics context
+ */
+ protected void initMetrics() {
+ MetricsContext metricsContext = MetricsUtil.getContext("fairscheduler");
+ this.metrics = MetricsUtil.createRecord(metricsContext,
+ getMetricsContextName());
+ metrics.setTag("name", getName());
+ metrics.setTag("taskType", getTaskType().toString());
+ }
+
+ void cleanupMetrics() {
+ metrics.remove();
+ metrics = null;
+ }
+
+ protected void setMetricValues(MetricsRecord metrics) {
+ metrics.setMetric("fairShare", (float)getFairShare());
+ metrics.setMetric("minShare", getMinShare());
+ metrics.setMetric("demand", getDemand());
+ metrics.setMetric("weight", (float)getWeight());
+ metrics.setMetric("runningTasks", getRunningTasks());
+ }
+
+ abstract void updateMetrics();
+
+ /** Convenient toString implementation for debugging. */
+ @Override
+ public String toString() {
+ return String.format("[%s, demand=%d, running=%d, share=%.1f, w=%.1f]",
+ getName(), getDemand(), getRunningTasks(), fairShare, getWeight());
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+class SchedulingAlgorithms {
+ public static final Log LOG = LogFactory.getLog(
+ SchedulingAlgorithms.class.getName());
+
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Hadoop.
+ */
+ public static class FifoComparator implements Comparator<Schedulable> {
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ int res = s1.getPriority().compareTo(s2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the JobID) to get a deterministic
+ // ordering, so we don't alternately launch tasks from different jobs.
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+ * below their min share get priority over those whose min share is met.
+ *
+ * Schedulables below their min share are compared by how far below it they
+ * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+ * and job B has 50 out of a min share of 100, then job B is scheduled next,
+ * because B is at 50% of its min share and A is at 80% of its min share.
+ *
+ * Schedulables above their min share are compared by (runningTasks / weight).
+ * If all weights are equal, slots are given to the job with the fewest tasks;
+ * otherwise, jobs with more weight get proportionally more slots.
+ */
+ public static class FairShareComparator implements Comparator<Schedulable> {
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ double minShareRatio1, minShareRatio2;
+ double tasksToWeightRatio1, tasksToWeightRatio2;
+ int minShare1 = Math.min(s1.getMinShare(), s1.getDemand());
+ int minShare2 = Math.min(s2.getMinShare(), s2.getDemand());
+ boolean s1Needy = s1.getRunningTasks() < minShare1;
+ boolean s2Needy = s2.getRunningTasks() < minShare2;
+ minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0);
+ minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0);
+ tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight();
+ tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight();
+ int res = 0;
+ if (s1Needy && !s2Needy)
+ res = -1;
+ else if (s2Needy && !s1Needy)
+ res = 1;
+ else if (s1Needy && s2Needy)
+ res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+ else // Neither schedulable is needy
+ res = (int) Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2);
+ if (res == 0) {
+ // Jobs are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0)
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Number of iterations for the binary search in computeFairShares. This is
+ * equivalent to the number of bits of precision in the output. 25 iterations
+ * gives precision better than 0.1 slots in clusters with one million slots.
+ */
+ private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+ /**
+ * Given a set of Schedulables and a number of slots, compute their weighted
+ * fair shares. The min shares and demands of the Schedulables are assumed to
+ * be set beforehand. We compute the fairest possible allocation of shares
+ * to the Schedulables that respects their min shares and demands.
+ *
+ * To understand what this method does, we must first define what weighted
+ * fair sharing means in the presence of minimum shares and demands. If there
+ * were no minimum shares and every Schedulable had an infinite demand (i.e.
+ * could launch infinitely many tasks), then weighted fair sharing would be
+ * achieved if the ratio of slotsAssigned / weight was equal for each
+ * Schedulable and all slots were assigned. Minimum shares and demands add
+ * two further twists:
+ * - Some Schedulables may not have enough tasks to fill all their share.
+ * - Some Schedulables may have a min share higher than their assigned share.
+ *
+ * To deal with these possibilities, we define an assignment of slots as
+ * being fair if there exists a ratio R such that:
+ * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+ * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+ * - All other Schedulables S are assigned share R * S.weight
+ * - The sum of all the shares is totalSlots.
+ *
+ * We call R the weight-to-slots ratio because it converts a Schedulable's
+ * weight to the number of slots it is assigned.
+ *
+ * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+ * To do this, we use binary search. Given a ratio R, we compute the number
+ * of slots that would be used in total with this ratio (the sum of the shares
+ * computed using the conditions above). If this number of slots is less than
+ * totalSlots, then R is too small and more slots could be assigned. If the
+ * number of slots is more than totalSlots, then R is too large.
+ *
+ * We begin the binary search with a lower bound on R of 0 (which means that
+ * all Schedulables are only given their minShare) and an upper bound computed
+ * to be large enough that too many slots are given (by doubling R until we
+ * either use more than totalSlots slots or we fulfill all jobs' demands).
+ * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+ * of slots used with a given value of R.
+ *
+ * The running time of this algorithm is linear in the number of Schedulables,
+ * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+ * iterations of binary search is a constant (dependent on desired precision).
+ */
+ public static void computeFairShares(
+ Collection<? extends Schedulable> schedulables, double totalSlots) {
+ // Find an upper bound on R that we can use in our binary search. We start
+ // at R = 1 and double it until we have either used totalSlots slots or we
+ // have met all Schedulables' demands (if total demand < totalSlots).
+ double totalDemand = 0;
+ for (Schedulable sched: schedulables) {
+ totalDemand += sched.getDemand();
+ }
+ double cap = Math.min(totalDemand, totalSlots);
+ double rMax = 1.0;
+ while (slotsUsedWithWeightToSlotRatio(rMax, schedulables) < cap) {
+ rMax *= 2.0;
+ }
+ // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+ double left = 0;
+ double right = rMax;
+ for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+ double mid = (left + right) / 2.0;
+ if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
+ left = mid;
+ } else {
+ right = mid;
+ }
+ }
+ // Set the fair shares based on the value of R we've converged to
+ for (Schedulable sched: schedulables) {
+ sched.setFairShare(computeShare(sched, right));
+ }
+ }
+
+ /**
+ * Compute the number of slots that would be used given a weight-to-slot
+ * ratio w2sRatio, for use in the computeFairShares algorithm as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static double slotsUsedWithWeightToSlotRatio(double w2sRatio,
+ Collection<? extends Schedulable> schedulables) {
+ double slotsTaken = 0;
+ for (Schedulable sched: schedulables) {
+ double share = computeShare(sched, w2sRatio);
+ slotsTaken += share;
+ }
+ return slotsTaken;
+ }
+
+ /**
+ * Compute the number of slots assigned to a Schedulable given a particular
+ * weight-to-slot ratio w2sRatio, for use in computeFairShares as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static double computeShare(Schedulable sched, double w2sRatio) {
+ double share = sched.getWeight() * w2sRatio;
+ share = Math.max(share, sched.getMinShare());
+ share = Math.min(share, sched.getDemand());
+ return share;
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Internal scheduling modes for pools.
+ */
+public enum SchedulingMode {
+ FAIR, FIFO
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java Sun Sep 11 23:57:37 2011
@@ -22,6 +22,7 @@ import java.io.IOException;
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A pluggable object for selecting tasks to run from a {@link JobInProgress} on
@@ -86,7 +87,7 @@ public abstract class TaskSelector imple
* @throws IOException
*/
public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
- JobInProgress job) throws IOException;
+ JobInProgress job, int localityLevel) throws IOException;
/**
* Choose a reduce task to run from the given job on the given TaskTracker.
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java Sun Sep 11 23:57:37 2011
@@ -19,6 +19,7 @@
package org.apache.hadoop.mapred;
import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* A pluggable object for altering the weights of jobs in the fair scheduler,
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapreduce.TaskType;
+
+/**
+ * Dummy implementation of Schedulable for unit testing.
+ */
+public class FakeSchedulable extends Schedulable {
+ private int demand;
+ private int runningTasks;
+ private int minShare;
+ private double weight;
+ private JobPriority priority;
+ private long startTime;
+
+ public FakeSchedulable() {
+ this(0, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand) {
+ this(demand, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare) {
+ this(demand, minShare, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare, double weight) {
+ this(demand, minShare, weight, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare, double weight, int fairShare,
+ int runningTasks, JobPriority priority, long startTime) {
+ this.demand = demand;
+ this.minShare = minShare;
+ this.weight = weight;
+ setFairShare(fairShare);
+ this.runningTasks = runningTasks;
+ this.priority = priority;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ return null;
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public String getName() {
+ return "FakeSchedulable" + this.hashCode();
+ }
+
+ @Override
+ public JobPriority getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getRunningTasks() {
+ return runningTasks;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public double getWeight() {
+ return weight;
+ }
+
+ @Override
+ public int getMinShare() {
+ return minShare;
+ }
+
+ @Override
+ public void redistributeShare() {}
+
+ @Override
+ public void updateDemand() {}
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.MAP;
+ }
+
+ @Override
+ protected String getMetricsContextName() {
+ return "fake";
+ }
+
+ @Override
+ void updateMetrics() {
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestCapBasedLoadManager.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,150 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapred.TaskStatus.State;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the canAssignMap and canAssignReduce methods in
+ * CapBasedLoadManager.
+ */
+public class TestCapBasedLoadManager extends TestCase {
+
+ /**
+ * Returns a running MapTaskStatus.
+ */
+ private TaskStatus getRunningMapTaskStatus() {
+ TaskStatus ts = new MapTaskStatus();
+ ts.setRunState(State.RUNNING);
+ return ts;
+ }
+
+ /**
+ * Returns a running ReduceTaskStatus.
+ */
+ private TaskStatus getRunningReduceTaskStatus() {
+ TaskStatus ts = new ReduceTaskStatus();
+ ts.setRunState(State.RUNNING);
+ return ts;
+ }
+
+ /**
+ * Returns a TaskTrackerStatus with the specified statistics.
+ * @param mapCap The capacity of map tasks
+ * @param reduceCap The capacity of reduce tasks
+ * @param runningMap The number of running map tasks
+ * @param runningReduce The number of running reduce tasks
+ */
+ private TaskTrackerStatus getTaskTrackerStatus(int mapCap, int reduceCap,
+ int runningMap, int runningReduce) {
+ List<TaskStatus> ts = new ArrayList<TaskStatus>();
+ for (int i = 0; i < runningMap; i++) {
+ ts.add(getRunningMapTaskStatus());
+ }
+ for (int i = 0; i < runningReduce; i++) {
+ ts.add(getRunningReduceTaskStatus());
+ }
+ TaskTrackerStatus tracker = new TaskTrackerStatus("tracker",
+ "tracker_host", 1234, ts, 0, mapCap, reduceCap);
+ return tracker;
+ }
+
+ /**
+ * A single test of canAssignMap.
+ */
+ private void oneTestCanAssignMap(float maxDiff, int mapCap, int runningMap,
+ int totalMapSlots, int totalRunnableMap, boolean expected) {
+
+ CapBasedLoadManager manager = new CapBasedLoadManager();
+ Configuration conf = new Configuration();
+ conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+ manager.setConf(conf);
+
+ TaskTrackerStatus ts = getTaskTrackerStatus(mapCap, 1, runningMap, 1);
+
+ assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableMap="
+ + totalRunnableMap + " and totalMapSlots=" + totalMapSlots
+ + ", a tracker with runningMap=" + runningMap + " and mapCap="
+ + mapCap + " should " + (expected ? "" : "not ")
+ + "be able to take more Maps.",
+ expected,
+ manager.canAssignMap(ts, totalRunnableMap, totalMapSlots)
+ );
+ }
+
+
+ /**
+ * Test canAssignMap method.
+ */
+ public void testCanAssignMap() {
+ oneTestCanAssignMap(0.0f, 5, 0, 50, 1, true);
+ oneTestCanAssignMap(0.0f, 5, 1, 50, 10, false);
+ oneTestCanAssignMap(0.2f, 5, 1, 50, 10, true);
+ oneTestCanAssignMap(0.0f, 5, 1, 50, 11, true);
+ oneTestCanAssignMap(0.0f, 5, 2, 50, 11, false);
+ oneTestCanAssignMap(0.3f, 5, 2, 50, 6, true);
+ oneTestCanAssignMap(1.0f, 5, 5, 50, 50, false);
+ }
+
+
+ /**
+ * A single test of canAssignReduce.
+ */
+ private void oneTestCanAssignReduce(float maxDiff, int ReduceCap,
+ int runningReduce, int totalReduceSlots, int totalRunnableReduce,
+ boolean expected) {
+
+ CapBasedLoadManager manager = new CapBasedLoadManager();
+ Configuration conf = new Configuration();
+ conf.setFloat("mapred.fairscheduler.load.max.diff", maxDiff);
+ manager.setConf(conf);
+
+ TaskTrackerStatus ts = getTaskTrackerStatus(1, ReduceCap, 1,
+ runningReduce);
+
+ assertEquals( "When maxDiff=" + maxDiff + ", with totalRunnableReduce="
+ + totalRunnableReduce + " and totalReduceSlots=" + totalReduceSlots
+ + ", a tracker with runningReduce=" + runningReduce
+ + " and ReduceCap=" + ReduceCap + " should "
+ + (expected ? "" : "not ") + "be able to take more Reduces.",
+ expected,
+ manager.canAssignReduce(ts, totalRunnableReduce, totalReduceSlots)
+ );
+ }
+
+ /**
+ * Test canAssignReduce method.
+ */
+ public void testCanAssignReduce() {
+ oneTestCanAssignReduce(0.0f, 5, 0, 50, 1, true);
+ oneTestCanAssignReduce(0.0f, 5, 1, 50, 10, false);
+ oneTestCanAssignReduce(0.2f, 5, 1, 50, 10, true);
+ oneTestCanAssignReduce(0.0f, 5, 1, 50, 11, true);
+ oneTestCanAssignReduce(0.0f, 5, 2, 50, 11, false);
+ oneTestCanAssignReduce(0.3f, 5, 2, 50, 6, true);
+ oneTestCanAssignReduce(1.0f, 5, 5, 50, 50, false);
+ }
+
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the computeFairShares method in SchedulingAlgorithms.
+ */
+public class TestComputeFairShares extends TestCase {
+ private List<Schedulable> scheds;
+
+ @Override
+ protected void setUp() throws Exception {
+ scheds = new ArrayList<Schedulable>();
+ }
+
+ /**
+ * Basic test - pools with different demands that are all higher than their
+ * fair share (of 10 slots) should each get their fair share.
+ */
+ public void testEqualSharing() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(30));
+ scheds.add(new FakeSchedulable(20));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(10, 10, 10, 10);
+ }
+
+ /**
+ * In this test, pool 4 has a smaller demand than the 40 / 4 = 10 slots that
+ * it would be assigned with equal sharing. It should only get the 3 slots
+ * it demands. The other pools must then split the remaining 37 slots, but
+ * pool 3, with 11 slots demanded, is now below its share of 37/3 ~= 12.3,
+ * so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each.
+ */
+ public void testLowDemands() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(11));
+ scheds.add(new FakeSchedulable(3));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(13, 13, 11, 3);
+ }
+
+ /**
+ * In this test, some pools have minimum shares set. Pool 1 has a min share
+ * of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its
+ * demand is only 10 so it can only get 10 slots. The remaining pools have
+ * 10 slots to split between them. Pool 4 gets 3 slots because its demand is
+ * only 3, and pool 3 gets the remaining 7 slots. Pool 4 also had a min share
+ * of 2 slots but this should not affect the outcome.
+ */
+ public void testMinShares() {
+ scheds.add(new FakeSchedulable(100, 20));
+ scheds.add(new FakeSchedulable(10, 20));
+ scheds.add(new FakeSchedulable(10, 0));
+ scheds.add(new FakeSchedulable(3, 2));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(20, 10, 7, 3);
+ }
+
+ /**
+ * Basic test for weighted shares with no minimum shares and no low demands.
+ * Each pool should get slots in proportion to its weight.
+ */
+ public void testWeightedSharing() {
+ scheds.add(new FakeSchedulable(100, 0, 2.0));
+ scheds.add(new FakeSchedulable(50, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 0, 1.0));
+ scheds.add(new FakeSchedulable(20, 0, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(20, 10, 10, 5);
+ }
+
+ /**
+ * Weighted sharing test where pools 1 and 2 are now given lower demands than
+ * above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split
+ * this into a 1:1:0.5 ratio, they would get 14:14:7 slots respectively, but
+ * pool 2's demand is only 11, so it only gets 11. The remaining 2 pools split
+ * the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively.
+ */
+ public void testWeightedSharingWithLowDemands() {
+ scheds.add(new FakeSchedulable(10, 0, 2.0));
+ scheds.add(new FakeSchedulable(11, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 0, 1.0));
+ scheds.add(new FakeSchedulable(20, 0, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(10, 11, 16, 8);
+ }
+
+ /**
+ * Weighted fair sharing test with min shares. As in the min share test above,
+ * pool 1 has a min share greater than its demand so it only gets its demand.
+ * Pool 3 has a min share of 15 even though its weight is very small, so it
+ * gets 15 slots. The remaining pools share the remaining 20 slots equally,
+ * getting 10 each. Pool 3's min share of 5 slots doesn't affect this.
+ */
+ public void testWeightedSharingWithMinShares() {
+ scheds.add(new FakeSchedulable(10, 20, 2.0));
+ scheds.add(new FakeSchedulable(11, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 5, 1.0));
+ scheds.add(new FakeSchedulable(20, 15, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(10, 10, 10, 15);
+ }
+
+ /**
+ * Test that shares are computed accurately even when there are many more
+ * frameworks than available slots.
+ */
+ public void testSmallShares() {
+ scheds.add(new FakeSchedulable(10));
+ scheds.add(new FakeSchedulable(5));
+ scheds.add(new FakeSchedulable(3));
+ scheds.add(new FakeSchedulable(2));
+ SchedulingAlgorithms.computeFairShares(scheds, 1);
+ verifyShares(0.25, 0.25, 0.25, 0.25);
+ }
+
+ /**
+ * Test that shares are computed accurately even when the number of slots is
+ * very large.
+ */
+ public void testLargeShares() {
+ int million = 1000 * 1000;
+ scheds.add(new FakeSchedulable(100 * million));
+ scheds.add(new FakeSchedulable(50 * million));
+ scheds.add(new FakeSchedulable(30 * million));
+ scheds.add(new FakeSchedulable(20 * million));
+ SchedulingAlgorithms.computeFairShares(scheds, 40 * million);
+ verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
+ }
+
+ /**
+ * Test that having a pool with 0 demand doesn't confuse the algorithm.
+ */
+ public void testZeroDemand() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(30));
+ scheds.add(new FakeSchedulable(0));
+ SchedulingAlgorithms.computeFairShares(scheds, 30);
+ verifyShares(10, 10, 10, 0);
+ }
+
+ /**
+ * Test that being called on an empty list doesn't confuse the algorithm.
+ */
+ public void testEmptyList() {
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares();
+ }
+
+ /**
+ * Check that a given list of shares have been assigned to this.scheds.
+ */
+ private void verifyShares(double... shares) {
+ assertEquals(scheds.size(), shares.length);
+ for (int i = 0; i < shares.length; i++) {
+ assertEquals(shares[i], scheds.get(i).getFairShare(), 0.01);
+ }
+ }
+}