You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2009/08/14 18:32:05 UTC
svn commit: r804284 [2/4] - in /hadoop/mapreduce/trunk: ./
src/contrib/fairscheduler/designdoc/
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content...
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Fri Aug 14 16:32:04 2009
@@ -44,10 +44,8 @@
* Servlet for displaying fair scheduler information, installed at
* [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
*
- * The main features are viewing each job's task count and fair share, ability
- * to change job priorities and pools from the UI, and ability to switch the
- * scheduler to FIFO mode without restarting the JobTracker if this is required
- * for any reason.
+ * The main features are viewing each job's task count and fair share,
+ * and admin controls to change job priorities and pools from the UI.
*
* There is also an "advanced" view for debugging that can be turned on by
* going to [job tracker URL]/scheduler?advanced.
@@ -82,12 +80,6 @@
// view page so that the user won't resubmit the data if they hit refresh.
boolean advancedView = request.getParameter("advanced") != null;
if (JSPUtil.privateActionsAllowed()
- && request.getParameter("setFifo") != null) {
- scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
- response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
- return;
- }
- if (JSPUtil.privateActionsAllowed()
&& request.getParameter("setPool") != null) {
Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
PoolManager poolMgr = null;
@@ -130,17 +122,14 @@
String hostname = StringUtils.simpleHostname(
jobTracker.getJobTrackerMachine());
out.print("<html><head>");
- out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+ out.printf("<title>%s Fair Scheduler Admininstration</title>\n", hostname);
out.print("<link rel=\"stylesheet\" type=\"text/css\" " +
"href=\"/static/hadoop.css\">\n");
out.print("</head><body>\n");
out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " +
- "Job Scheduler Administration</h1>\n", hostname);
+ "Fair Scheduler Administration</h1>\n", hostname);
showPools(out, advancedView);
showJobs(out, advancedView);
- if (JSPUtil.privateActionsAllowed()) {
- showAdminForm(out, advancedView);
- }
out.print("</body></html>\n");
out.close();
}
@@ -153,9 +142,13 @@
PoolManager poolManager = scheduler.getPoolManager();
out.print("<h2>Pools</h2>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
- out.print("<tr><th>Pool</th><th>Running Jobs</th>" +
- "<th>Min Maps</th><th>Min Reduces</th>" +
- "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+ out.print("<tr><th rowspan=2>Pool</th>" +
+ "<th rowspan=2>Running Jobs</th>" +
+ "<th colspan=3>Map Tasks</th>" +
+ "<th colspan=3>Reduce Tasks</th>" +
+ "<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" +
+ "<th>Min Share</th><th>Running</th><th>Fair Share</th>" +
+ "<th>Min Share</th><th>Running</th><th>Fair Share</th></tr>\n");
List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
Collections.sort(pools, new Comparator<Pool>() {
public int compare(Pool p1, Pool p2) {
@@ -166,24 +159,21 @@
else return p1.getName().compareTo(p2.getName());
}});
for (Pool pool: pools) {
- int runningMaps = 0;
- int runningReduces = 0;
- for (JobInProgress job: pool.getJobs()) {
- JobInfo info = scheduler.infos.get(job);
- if (info != null) {
- runningMaps += info.runningMaps;
- runningReduces += info.runningReduces;
- }
- }
- out.print("<tr>\n");
- out.printf("<td>%s</td>\n", pool.getName());
- out.printf("<td>%s</td>\n", pool.getJobs().size());
- out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ String name = pool.getName();
+ int runningMaps = pool.getMapSchedulable().getRunningTasks();
+ int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+ out.print("<tr>");
+ out.printf("<td>%s</td>", name);
+ out.printf("<td>%d</td>", pool.getJobs().size());
+ out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.MAP));
- out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ out.printf("<td>%d</td>", runningMaps);
+ out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+ out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.REDUCE));
- out.printf("<td>%s</td>\n", runningMaps);
- out.printf("<td>%s</td>\n", runningReduces);
+ out.printf("<td>%d</td>", runningReduces);
+ out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
+ out.printf("<td>%s</td>", pool.getSchedulingMode());
out.print("</tr>\n");
}
out.print("</table>\n");
@@ -196,21 +186,21 @@
private void showJobs(PrintWriter out, boolean advancedView) {
out.print("<h2>Running Jobs</h2>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
- int colsPerTaskType = advancedView ? 6 : 3;
+ int colsPerTaskType = advancedView ? 4 : 3;
out.printf("<tr><th rowspan=2>Submitted</th>" +
"<th rowspan=2>JobID</th>" +
"<th rowspan=2>User</th>" +
"<th rowspan=2>Name</th>" +
"<th rowspan=2>Pool</th>" +
"<th rowspan=2>Priority</th>" +
- "<th colspan=%d>Maps</th>" +
- "<th colspan=%d>Reduces</th>",
+ "<th colspan=%d>Map Tasks</th>" +
+ "<th colspan=%d>Reduce Tasks</th>",
colsPerTaskType, colsPerTaskType);
out.print("</tr><tr>\n");
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
- (advancedView ? "<th>Weight</th><th>Deficit</th><th>minMaps</th>" : ""));
+ (advancedView ? "<th>Weight</th>" : ""));
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
- (advancedView ? "<th>Weight</th><th>Deficit</th><th>minReduces</th>" : ""));
+ (advancedView ? "<th>Weight</th>" : ""));
out.print("</tr>\n");
Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
synchronized (scheduler) {
@@ -218,7 +208,7 @@
JobProfile profile = job.getProfile();
JobInfo info = scheduler.infos.get(job);
if (info == null) { // Job finished, but let's show 0's for info
- info = new JobInfo(0);
+ info = new JobInfo(null, null);
}
out.print("<tr>\n");
out.printf("<td>%s</td>\n", DATE_FORMAT.format(
@@ -241,23 +231,24 @@
out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
out.printf("<td>%s</td>\n", job.getPriority().toString());
}
- out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
- job.finishedMaps(), job.desiredMaps(), info.runningMaps,
- info.mapFairShare);
+ Pool pool = scheduler.getPoolManager().getPool(job);
+ String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedMaps(), job.desiredMaps(),
+ info.mapSchedulable.getRunningTasks(),
+ mapShare);
if (advancedView) {
- out.printf("<td>%8.1f</td>\n", info.mapWeight);
- out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
- (info.mapDeficit / 1000) + "s" : "--");
- out.printf("<td>%d</td>\n", info.minMaps);
+ out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
}
- out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
- job.finishedReduces(), job.desiredReduces(), info.runningReduces,
- info.reduceFairShare);
+ String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedReduces(), job.desiredReduces(),
+ info.reduceSchedulable.getRunningTasks(),
+ reduceShare);
if (advancedView) {
- out.printf("<td>%8.1f</td>\n", info.reduceWeight);
- out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
- (info.reduceDeficit / 1000) + "s" : "--");
- out.printf("<td>%d</td>\n", info.minReduces);
+ out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
}
out.print("</tr>\n");
}
@@ -287,24 +278,4 @@
html.append("</select>\n");
return html.toString();
}
-
- /**
- * Print the administration form at the bottom of the page, which currently
- * only includes the button for switching between FIFO and Fair Scheduling.
- */
- private void showAdminForm(PrintWriter out, boolean advancedView) {
- out.print("<h2>Scheduling Mode</h2>\n");
- String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
- String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
- String advParam = advancedView ? "?advanced" : "";
- out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
- out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
- "<input type=\"submit\" value=\"Switch to %s mode.\" " +
- "onclick=\"return confirm('Are you sure you want to change " +
- "scheduling mode to %s?')\" />\n",
- curMode, otherMode, otherMode);
- out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
- !scheduler.getUseFifo());
- out.print("</form>\n");
- }
}
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Fri Aug 14 16:32:04 2009
@@ -35,7 +35,8 @@
}
}
if (res == 0) {
- res = j1.hashCode() - j2.hashCode();
+ // If there is a tie, break it by job ID to get a deterministic order
+ res = j1.getJobID().compareTo(j2.getJobID());
}
return res;
}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class JobSchedulable extends Schedulable {
+ private FairScheduler scheduler;
+ private JobInProgress job;
+ private TaskType taskType;
+ private int demand = 0;
+
+ public JobSchedulable(FairScheduler scheduler, JobInProgress job,
+ TaskType taskType) {
+ this.scheduler = scheduler;
+ this.job = job;
+ this.taskType = taskType;
+ }
+
+ @Override
+ public String getName() {
+ return job.getJobID().toString();
+ }
+
+ public JobInProgress getJob() {
+ return job;
+ }
+
+ @Override
+ public void updateDemand() {
+ demand = 0;
+ if (isRunnable()) {
+ // For reduces, make sure enough maps are done that reduces can launch
+ if (taskType == TaskType.REDUCE && !job.scheduleReduces())
+ return;
+ // Add up demand from each TaskInProgress; each TIP can either
+ // - have no attempts running, in which case it demands 1 slot
+ // - have N attempts running, in which case it demands N slots, and may
+ // potentially demand one more slot if it needs to be speculated
+ TaskInProgress[] tips = (taskType == TaskType.MAP ?
+ job.getMapTasks() : job.getReduceTasks());
+ boolean speculationEnabled = (taskType == TaskType.MAP ?
+ job.hasSpeculativeMaps() : job.hasSpeculativeReduces());
+ long time = scheduler.getClock().getTime();
+ for (TaskInProgress tip: tips) {
+ if (!tip.isComplete()) {
+ if (tip.isRunning()) {
+ // Count active tasks and any speculative task we want to launch
+ demand += tip.getActiveTasks().size();
+ if (speculationEnabled && tip.canBeSpeculated(time))
+ demand += 1;
+ } else {
+ // Need to launch 1 task
+ demand += 1;
+ }
+ }
+ }
+ }
+ }
+
+ private boolean isRunnable() {
+ JobInfo info = scheduler.getJobInfo(job);
+ int runState = job.getStatus().getRunState();
+ return (info != null && info.runnable && runState == JobStatus.RUNNING);
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public void redistributeShare() {}
+
+ @Override
+ public JobPriority getPriority() {
+ return job.getPriority();
+ }
+
+ @Override
+ public int getRunningTasks() {
+ return taskType == TaskType.MAP ? job.runningMaps() : job.runningReduces();
+ }
+
+ @Override
+ public long getStartTime() {
+ return job.startTime;
+ }
+
+ @Override
+ public double getWeight() {
+ return scheduler.getJobWeight(job, taskType);
+ }
+
+ @Override
+ public int getMinShare() {
+ return 0;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ if (isRunnable()) {
+ visited.add(job);
+ TaskTrackerManager ttm = scheduler.taskTrackerManager;
+ ClusterStatus clusterStatus = ttm.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+ if (taskType == TaskType.MAP) {
+ LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
+ job, currentTime);
+ scheduler.getEventLog().log(
+ "ALLOWED_LOC_LEVEL", job.getJobID(), localityLevel);
+ // obtainNewMapTask needs to be passed 1 + the desired locality level
+ return job.obtainNewMapTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts(), localityLevel.toCacheLevelCap());
+ } else {
+ return job.obtainNewReduceTask(tts, numTaskTrackers,
+ ttm.getNumberOfUniqueHosts());
+ }
+ } else {
+ return null;
+ }
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LocalityLevel.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Represents the level of data-locality at which a job in the fair scheduler
+ * is allowed to launch tasks. By default, jobs are not allowed to launch
+ * non-data-local tasks until they have waited a small number of seconds to
+ * find a slot on a node that they have data on. If a job has waited this
+ * long, it is allowed to launch rack-local tasks as well (on nodes that may
+ * not have the task's input data, but share a rack with a node that does).
+ * Finally, after a further wait, jobs are allowed to launch tasks anywhere
+ * in the cluster.
+ *
+ * This enum defines three levels - NODE, RACK and ANY (for allowing tasks
+ * to be launched on any node). A map task's level can be obtained from
+ * its job through {@link #fromTask(JobInProgress, Task, TaskTrackerStatus)}. In
+ * addition, for any locality level, it is possible to get a "level cap" to pass
+ * to {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks at this level or lower are launched, through
+ * the {@link #toCacheLevelCap()} method.
+ */
+public enum LocalityLevel {
+ NODE, RACK, ANY;
+
+ public static LocalityLevel fromTask(JobInProgress job, Task mapTask,
+ TaskTrackerStatus tracker) {
+ TaskID tipID = mapTask.getTaskID().getTaskID();
+ TaskInProgress tip = job.getTaskInProgress(tipID);
+ switch (job.getLocalityLevel(tip, tracker)) {
+ case 0: return LocalityLevel.NODE;
+ case 1: return LocalityLevel.RACK;
+ default: return LocalityLevel.ANY;
+ }
+ }
+
+ /**
+ * Obtain a JobInProgress cache level cap to pass to
+ * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int, int, int)}
+ * to ensure that only tasks of this locality level and lower are launched.
+ */
+ public int toCacheLevelCap() {
+ switch(this) {
+ case NODE: return 1;
+ case RACK: return 2;
+ default: return Integer.MAX_VALUE;
+ }
+ }
+}
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Fri Aug 14 16:32:04 2009
@@ -21,6 +21,8 @@
import java.util.ArrayList;
import java.util.Collection;
+import org.apache.hadoop.mapreduce.TaskType;
+
/**
* A schedulable pool of jobs.
*/
@@ -33,9 +35,17 @@
/** Jobs in this specific pool; does not include children pools' jobs. */
private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+ /** Scheduling mode for jobs inside the pool (fair or FIFO) */
+ private SchedulingMode schedulingMode;
- public Pool(String name) {
+ private PoolSchedulable mapSchedulable;
+ private PoolSchedulable reduceSchedulable;
+
+ public Pool(FairScheduler scheduler, String name) {
this.name = name;
+ mapSchedulable = new PoolSchedulable(scheduler, this, TaskType.MAP);
+ reduceSchedulable = new PoolSchedulable(scheduler, this, TaskType.REDUCE);
}
public Collection<JobInProgress> getJobs() {
@@ -44,17 +54,41 @@
public void addJob(JobInProgress job) {
jobs.add(job);
+ mapSchedulable.addJob(job);
+ reduceSchedulable.addJob(job);
}
public void removeJob(JobInProgress job) {
jobs.remove(job);
+ mapSchedulable.removeJob(job);
+ reduceSchedulable.removeJob(job);
}
public String getName() {
return name;
}
+ public SchedulingMode getSchedulingMode() {
+ return schedulingMode;
+ }
+
+ public void setSchedulingMode(SchedulingMode schedulingMode) {
+ this.schedulingMode = schedulingMode;
+ }
+
public boolean isDefaultPool() {
return Pool.DEFAULT_POOL_NAME.equals(name);
}
+
+ public PoolSchedulable getMapSchedulable() {
+ return mapSchedulable;
+ }
+
+ public PoolSchedulable getReduceSchedulable() {
+ return reduceSchedulable;
+ }
+
+ public PoolSchedulable getSchedulable(TaskType type) {
+ return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
+ }
}
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Aug 14 16:32:04 2009
@@ -60,6 +60,8 @@
* (this is done to prevent loading a file that hasn't been fully written).
*/
public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+
+ private final FairScheduler scheduler;
// Map and reduce minimum allocations for each pool
private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
@@ -89,6 +91,8 @@
// below half its fair share for this long, it is allowed to preempt tasks.
private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
private Object allocFile; // Path to XML file containing allocations. This
// is either a URL to specify a classpath resource
// (if the fair-scheduler.xml on the classpath is
@@ -103,8 +107,13 @@
private long lastSuccessfulReload; // Last time we successfully reloaded pools
private boolean lastReloadAttemptFailed = false;
- public PoolManager(Configuration conf) throws IOException, SAXException,
+ public PoolManager(FairScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public void initialize() throws IOException, SAXException,
AllocationConfigurationException, ParserConfigurationException {
+ Configuration conf = scheduler.getConf();
this.poolNameProperty = conf.get(
"mapred.fairscheduler.poolnameproperty", "user.name");
this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
@@ -131,11 +140,19 @@
public synchronized Pool getPool(String name) {
Pool pool = pools.get(name);
if (pool == null) {
- pool = new Pool(name);
+ pool = new Pool(scheduler, name);
+ pool.setSchedulingMode(defaultSchedulingMode);
pools.put(name, pool);
}
return pool;
}
+
+ /**
+ * Get the pool that a given job is in.
+ */
+ public Pool getPool(JobInProgress job) {
+ return getPool(getPoolName(job));
+ }
/**
* Reload allocations file if it hasn't been loaded in a while
@@ -203,11 +220,13 @@
Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
Map<String, Double> poolWeights = new HashMap<String, Double>();
+ Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
int userMaxJobsDefault = Integer.MAX_VALUE;
int poolMaxJobsDefault = Integer.MAX_VALUE;
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
// Remember all pool names so we can display them on web UI, etc.
List<String> poolNamesInAllocFile = new ArrayList<String>();
@@ -262,7 +281,10 @@
String text = ((Text)field.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
minSharePreemptionTimeouts.put(poolName, val);
- }
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ poolModes.put(poolName, parseSchedulingMode(text));
+ }
}
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
@@ -294,6 +316,9 @@
String text = ((Text)element.getFirstChild()).getData().trim();
long val = Long.parseLong(text) * 1000L;
defaultMinSharePreemptionTimeout = val;
+ } else if ("defaultPoolSchedulingMode".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ defaultSchedulingMode = parseSchedulingMode(text);
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -312,12 +337,31 @@
this.poolMaxJobsDefault = poolMaxJobsDefault;
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ this.defaultSchedulingMode = defaultSchedulingMode;
for (String name: poolNamesInAllocFile) {
- getPool(name);
+ Pool pool = getPool(name);
+ if (poolModes.containsKey(name)) {
+ pool.setSchedulingMode(poolModes.get(name));
+ } else {
+ pool.setSchedulingMode(defaultSchedulingMode);
+ }
}
}
}
+ private SchedulingMode parseSchedulingMode(String text)
+ throws AllocationConfigurationException {
+ text = text.toLowerCase();
+ if (text.equals("fair")) {
+ return SchedulingMode.FAIR;
+ } else if (text.equals("fifo")) {
+ return SchedulingMode.FIFO;
+ } else {
+ throw new AllocationConfigurationException(
+ "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+ }
+ }
+
/**
* Get the allocation for a particular pool
*/
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
+
+public class PoolSchedulable extends Schedulable {
+ public static final Log LOG = LogFactory.getLog(
+ PoolSchedulable.class.getName());
+
+ private FairScheduler scheduler;
+ private Pool pool;
+ private TaskType taskType;
+ private PoolManager poolMgr;
+ private List<JobSchedulable> jobScheds = new LinkedList<JobSchedulable>();
+ private int demand = 0;
+
+ // Variables used for preemption
+ long lastTimeAtMinShare;
+ long lastTimeAtHalfFairShare;
+
+ public PoolSchedulable(FairScheduler scheduler, Pool pool, TaskType type) {
+ this.scheduler = scheduler;
+ this.pool = pool;
+ this.taskType = type;
+ this.poolMgr = scheduler.getPoolManager();
+ long currentTime = scheduler.getClock().getTime();
+ this.lastTimeAtMinShare = currentTime;
+ this.lastTimeAtHalfFairShare = currentTime;
+ }
+
+ public void addJob(JobInProgress job) {
+ JobInfo info = scheduler.getJobInfo(job);
+ jobScheds.add(taskType == TaskType.MAP ?
+ info.mapSchedulable : info.reduceSchedulable);
+ }
+
+ public void removeJob(JobInProgress job) {
+ for (Iterator<JobSchedulable> it = jobScheds.iterator(); it.hasNext();) {
+ JobSchedulable jobSched = it.next();
+ if (jobSched.getJob() == job) {
+ it.remove();
+ break;
+ }
+ }
+ }
+
+ /**
+ * Update demand by asking jobs in the pool to update
+ */
+ @Override
+ public void updateDemand() {
+ demand = 0;
+ for (JobSchedulable sched: jobScheds) {
+ sched.updateDemand();
+ demand += sched.getDemand();
+ }
+ }
+
+ /**
+ * Distribute the pool's fair share among its jobs
+ */
+ @Override
+ public void redistributeShare() {
+ if (pool.getSchedulingMode() == SchedulingMode.FAIR) {
+ SchedulingAlgorithms.computeFairShares(jobScheds, getFairShare());
+ } else {
+ for (JobSchedulable sched: jobScheds) {
+ sched.setFairShare(0);
+ }
+ }
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public int getMinShare() {
+ return poolMgr.getAllocation(pool.getName(), taskType);
+ }
+
+ @Override
+ public double getWeight() {
+ return poolMgr.getPoolWeight(pool.getName());
+ }
+
+ @Override
+ public JobPriority getPriority() {
+ return JobPriority.NORMAL;
+ }
+
+ @Override
+ public int getRunningTasks() {
+ int ans = 0;
+ for (JobSchedulable sched: jobScheds) {
+ ans += sched.getRunningTasks();
+ }
+ return ans;
+ }
+
+ @Override
+ public long getStartTime() {
+ return 0;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ SchedulingMode mode = pool.getSchedulingMode();
+ Comparator<Schedulable> comparator;
+ if (mode == SchedulingMode.FIFO) {
+ comparator = new SchedulingAlgorithms.FifoComparator();
+ } else if (mode == SchedulingMode.FAIR) {
+ comparator = new SchedulingAlgorithms.FairShareComparator();
+ } else {
+ throw new RuntimeException("Unsupported pool scheduling mode " + mode);
+ }
+ Collections.sort(jobScheds, comparator);
+ for (JobSchedulable sched: jobScheds) {
+ Task task = sched.assignTask(tts, currentTime, visited);
+ if (task != null)
+ return task;
+ }
+ return null;
+ }
+
+ @Override
+ public String getName() {
+ return pool.getName();
+ }
+
+ Pool getPool() {
+ return pool;
+ }
+
+ public TaskType getTaskType() {
+ return taskType;
+ }
+
+ public Collection<JobSchedulable> getJobSchedulables() {
+ return jobScheds;
+ }
+
+ public long getLastTimeAtMinShare() {
+ return lastTimeAtMinShare;
+ }
+
+ public void setLastTimeAtMinShare(long lastTimeAtMinShare) {
+ this.lastTimeAtMinShare = lastTimeAtMinShare;
+ }
+
+ public long getLastTimeAtHalfFairShare() {
+ return lastTimeAtHalfFairShare;
+ }
+
+ public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
+ this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,131 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a pool. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a pool and across pools. There are
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and PoolSchedulables, which allocate among jobs in their pool.
+ *
+ * Separate sets of Schedulables are used for maps and reduces. Each pool has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ *
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/pool to the scheduler, including:
+ * - Demand (maximum number of tasks required)
+ * - Number of currently running tasks
+ * - Minimum share (for pools)
+ * - Job/pool weight (for fair sharing)
+ * - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ *
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ * jobs and pools, which may be expensive (e.g. jobs must iterate through all
+ * their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ * fair share has been set by its parent to let it distribute its share among
+ * the other Schedulables within it (e.g. for pools that want to perform fair
+ * sharing among their jobs).
+ */
+abstract class Schedulable {
+ /** Fair share assigned to this Schedulable */
+ private double fairShare = 0;
+
+ /**
+ * Name of job/pool, used for debugging as well as for breaking ties in
+ * scheduling order deterministically.
+ */
+ public abstract String getName();
+
+ /**
+ * Maximum number of tasks required by this Schedulable. This is defined as
+ * number of currently running tasks + number of unlaunched tasks (tasks that
+ * are either not yet launched or need to be speculated).
+ */
+ public abstract int getDemand();
+
+ /** Number of tasks the schedulable is currently running. */
+ public abstract int getRunningTasks();
+
+ /** Minimum share slots assigned to the schedulable. */
+ public abstract int getMinShare();
+
+ /** Job/pool weight in fair sharing. */
+ public abstract double getWeight();
+
+ /** Job priority for jobs in FIFO pools; meaningless for PoolSchedulables. */
+ public abstract JobPriority getPriority();
+
+ /** Start time for jobs in FIFO pools; meaningless for PoolSchedulables. */
+ public abstract long getStartTime();
+
+ /** Refresh the Schedulable's demand and those of its children if any. */
+ public abstract void updateDemand();
+
+ /**
+ * Distribute the fair share assigned to this Schedulable among its
+ * children (used in pools where the internal scheduler is fair sharing).
+ */
+ public abstract void redistributeShare();
+
+ /**
+ * Obtain a task for a given TaskTracker, or null if the Schedulable has
+ * no tasks to launch at this moment or does not wish to launch a task on
+ * this TaskTracker (e.g. is waiting for a TaskTracker with local data).
+ * In addition, if a job is skipped during this search because it is waiting
+ * for a TaskTracker with local data, this method is expected to add it to
+ * the <tt>visited</tt> collection passed in, so that the scheduler can
+ * properly mark it as skipped during this heartbeat. Please see
+ * {@link FairScheduler#getAllowedLocalityLevel(JobInProgress, long)}
+ * for details of delay scheduling (waiting for trackers with local data).
+ *
+ * @param tts TaskTracker that the task will be launched on
+ * @param currentTime Cached time (to prevent excessive calls to gettimeofday)
+ * @param visited A Collection to which this method must add all jobs that
+ * were considered during the search for a job to assign.
+ * @return Task to launch, or null if Schedulable cannot currently launch one.
+ * @throws IOException Possible if obtainNew(Map|Reduce)Task throws exception.
+ */
+ public abstract Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException;
+
+ /** Assign a fair share to this Schedulable. */
+ public void setFairShare(double fairShare) {
+ this.fairShare = fairShare;
+ }
+
+ /** Get the fair share assigned to this Schedulable. */
+ public double getFairShare() {
+ return fairShare;
+ }
+
+ /** Convenient toString implementation for debugging. */
+ @Override
+ public String toString() {
+ return String.format("[%s, demand=%d, running=%d, share=%.1f, w=%.1f]",
+ getName(), getDemand(), getRunningTasks(), fairShare, getWeight());
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingAlgorithms.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,209 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+class SchedulingAlgorithms {
+ public static final Log LOG = LogFactory.getLog(
+ SchedulingAlgorithms.class.getName());
+
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Hadoop.
+ */
+ public static class FifoComparator implements Comparator<Schedulable> {
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ int res = s1.getPriority().compareTo(s2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the JobID) to get a deterministic
+ // ordering, so we don't alternately launch tasks from different jobs.
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+ * below their min share get priority over those whose min share is met.
+ *
+ * Schedulables below their min share are compared by how far below it they
+ * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+ * and job B has 50 out of a min share of 100, then job B is scheduled next,
+ * because B is at 50% of its min share and A is at 80% of its min share.
+ *
+ * Schedulables above their min share are compared by (runningTasks / weight).
+ * If all weights are equal, slots are given to the job with the fewest tasks;
+ * otherwise, jobs with more weight get proportionally more slots.
+ */
+ public static class FairShareComparator implements Comparator<Schedulable> {
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ double minShareRatio1, minShareRatio2;
+ double tasksToWeightRatio1, tasksToWeightRatio2;
+ int minShare1 = Math.min(s1.getMinShare(), s1.getDemand());
+ int minShare2 = Math.min(s2.getMinShare(), s2.getDemand());
+ boolean s1Needy = s1.getRunningTasks() < minShare1;
+ boolean s2Needy = s2.getRunningTasks() < minShare2;
+ minShareRatio1 = s1.getRunningTasks() / Math.max(minShare1, 1.0);
+ minShareRatio2 = s2.getRunningTasks() / Math.max(minShare2, 1.0);
+ tasksToWeightRatio1 = s1.getRunningTasks() / s1.getWeight();
+ tasksToWeightRatio2 = s2.getRunningTasks() / s2.getWeight();
+ int res = 0;
+ if (s1Needy && !s2Needy)
+ res = -1;
+ else if (s2Needy && !s1Needy)
+ res = 1;
+ else if (s1Needy && s2Needy)
+ res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+ else // Neither schedulable is needy
+ res = (int) Math.signum(tasksToWeightRatio1 - tasksToWeightRatio2);
+ if (res == 0) {
+ // Jobs are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0)
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Number of iterations for the binary search in computeFairShares. This is
+ * equivalent to the number of bits of precision in the output. 25 iterations
+ * gives precision better than 0.1 slots in clusters with one million slots.
+ */
+ private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+ /**
+ * Given a set of Schedulables and a number of slots, compute their weighted
+ * fair shares. The min shares and demands of the Schedulables are assumed to
+ * be set beforehand. We compute the fairest possible allocation of shares
+ * to the Schedulables that respects their min shares and demands.
+ *
+ * To understand what this method does, we must first define what weighted
+ * fair sharing means in the presence of minimum shares and demands. If there
+ * were no minimum shares and every Schedulable had an infinite demand (i.e.
+ * could launch infinitely many tasks), then weighted fair sharing would be
+ * achieved if the ratio of slotsAssigned / weight was equal for each
+ * Schedulable and all slots were assigned. Minimum shares and demands add
+ * two further twists:
+ * - Some Schedulables may not have enough tasks to fill all their share.
+ * - Some Schedulables may have a min share higher than their assigned share.
+ *
+ * To deal with these possibilities, we define an assignment of slots as
+ * being fair if there exists a ratio R such that:
+ * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+ * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+ * - All other Schedulables S are assigned share R * S.weight
+ * - The sum of all the shares is totalSlots.
+ *
+ * We call R the weight-to-slots ratio because it converts a Schedulable's
+ * weight to the number of slots it is assigned.
+ *
+ * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+ * To do this, we use binary search. Given a ratio R, we compute the number
+ * of slots that would be used in total with this ratio (the sum of the shares
+ * computed using the conditions above). If this number of slots is less than
+ * totalSlots, then R is too small and more slots could be assigned. If the
+ * number of slots is more than totalSlots, then R is too large.
+ *
+ * We begin the binary search with a lower bound on R of 0 (which means that
+ * all Schedulables are only given their minShare) and an upper bound computed
+ * to be large enough that too many slots are given (by doubling R until we
+ * either use more than totalSlots slots or we fulfill all jobs' demands).
+ * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+ * of slots used with a given value of R.
+ *
+ * The running time of this algorithm is linear in the number of Schedulables,
+ * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+ * iterations of binary search is a constant (dependent on desired precision).
+ */
+ public static void computeFairShares(
+ Collection<? extends Schedulable> schedulables, double totalSlots) {
+ // Find an upper bound on R that we can use in our binary search. We start
+ // at R = 1 and double it until we have either used totalSlots slots or we
+ // have met all Schedulables' demands (if total demand < totalSlots).
+ double totalDemand = 0;
+ for (Schedulable sched: schedulables) {
+ totalDemand += sched.getDemand();
+ }
+ double cap = Math.min(totalDemand, totalSlots);
+ double rMax = 1.0;
+ while (slotsUsedWithWeightToSlotRatio(rMax, schedulables) < cap) {
+ rMax *= 2.0;
+ }
+ // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+ double left = 0;
+ double right = rMax;
+ for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+ double mid = (left + right) / 2.0;
+ if (slotsUsedWithWeightToSlotRatio(mid, schedulables) < cap) {
+ left = mid;
+ } else {
+ right = mid;
+ }
+ }
+ // Set the fair shares based on the value of R we've converged to
+ for (Schedulable sched: schedulables) {
+ sched.setFairShare(computeShare(sched, right));
+ }
+ }
+
+ /**
+ * Compute the number of slots that would be used given a weight-to-slot
+ * ratio w2sRatio, for use in the computeFairShares algorithm as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static double slotsUsedWithWeightToSlotRatio(double w2sRatio,
+ Collection<? extends Schedulable> schedulables) {
+ double slotsTaken = 0;
+ for (Schedulable sched: schedulables) {
+ double share = computeShare(sched, w2sRatio);
+ slotsTaken += share;
+ }
+ return slotsTaken;
+ }
+
+ /**
+ * Compute the number of slots assigned to a Schedulable given a particular
+ * weight-to-slot ratio w2sRatio, for use in computeFairShares as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static double computeShare(Schedulable sched, double w2sRatio) {
+ double share = sched.getWeight() * w2sRatio;
+ share = Math.max(share, sched.getMinShare());
+ share = Math.min(share, sched.getDemand());
+ return share;
+ }
+}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/SchedulingMode.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,26 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Internal scheduling modes for pools.
+ */
+public enum SchedulingMode {
+ FAIR, FIFO
+}
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java?rev=804284&r1=804283&r2=804284&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java Fri Aug 14 16:32:04 2009
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
/**
* A pluggable object for selecting tasks to run from a {@link JobInProgress} on
@@ -86,7 +87,7 @@
* @throws IOException
*/
public abstract Task obtainNewMapTask(TaskTrackerStatus taskTracker,
- JobInProgress job) throws IOException;
+ JobInProgress job, int localityLevel) throws IOException;
/**
* Choose a reduce task to run from the given job on the given TaskTracker.
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.Collection;
+
+/**
+ * Dummy implementation of Schedulable for unit testing.
+ */
+public class FakeSchedulable extends Schedulable {
+ private int demand;
+ private int runningTasks;
+ private int minShare;
+ private double weight;
+ private JobPriority priority;
+ private long startTime;
+
+ public FakeSchedulable() {
+ this(0, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand) {
+ this(demand, 0, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare) {
+ this(demand, minShare, 1, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare, double weight) {
+ this(demand, minShare, weight, 0, 0, JobPriority.NORMAL, 0);
+ }
+
+ public FakeSchedulable(int demand, int minShare, double weight, int fairShare,
+ int runningTasks, JobPriority priority, long startTime) {
+ this.demand = demand;
+ this.minShare = minShare;
+ this.weight = weight;
+ setFairShare(fairShare);
+ this.runningTasks = runningTasks;
+ this.priority = priority;
+ this.startTime = startTime;
+ }
+
+ @Override
+ public Task assignTask(TaskTrackerStatus tts, long currentTime,
+ Collection<JobInProgress> visited) throws IOException {
+ return null;
+ }
+
+ @Override
+ public int getDemand() {
+ return demand;
+ }
+
+ @Override
+ public String getName() {
+ return "FakeSchedulable" + this.hashCode();
+ }
+
+ @Override
+ public JobPriority getPriority() {
+ return priority;
+ }
+
+ @Override
+ public int getRunningTasks() {
+ return runningTasks;
+ }
+
+ @Override
+ public long getStartTime() {
+ return startTime;
+ }
+
+ @Override
+ public double getWeight() {
+ return weight;
+ }
+
+ @Override
+ public int getMinShare() {
+ return minShare;
+ }
+
+ @Override
+ public void redistributeShare() {}
+
+ @Override
+ public void updateDemand() {}
+}
Added: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java?rev=804284&view=auto
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java (added)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestComputeFairShares.java Fri Aug 14 16:32:04 2009
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import junit.framework.TestCase;
+
+/**
+ * Exercise the computeFairShares method in SchedulingAlgorithms.
+ */
+public class TestComputeFairShares extends TestCase {
+ private List<Schedulable> scheds;
+
+ @Override
+ protected void setUp() throws Exception {
+ scheds = new ArrayList<Schedulable>();
+ }
+
+ /**
+ * Basic test - pools with different demands that are all higher than their
+ * fair share (of 10 slots) should each get their fair share.
+ */
+ public void testEqualSharing() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(30));
+ scheds.add(new FakeSchedulable(20));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(10, 10, 10, 10);
+ }
+
+ /**
+ * In this test, pool 4 has a smaller demand than the 40 / 4 = 10 slots that
+ * it would be assigned with equal sharing. It should only get the 3 slots
+ * it demands. The other pools must then split the remaining 37 slots, but
+ * pool 3, with 11 slots demanded, is now below its share of 37/3 ~= 12.3,
+ * so it only gets 11 slots. Pools 1 and 2 split the rest and get 13 each.
+ */
+ public void testLowDemands() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(11));
+ scheds.add(new FakeSchedulable(3));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(13, 13, 11, 3);
+ }
+
+ /**
+ * In this test, some pools have minimum shares set. Pool 1 has a min share
+ * of 20 so it gets 20 slots. Pool 2 also has a min share of 20, but its
+ * demand is only 10 so it can only get 10 slots. The remaining pools have
+ * 10 slots to split between them. Pool 4 gets 3 slots because its demand is
+ * only 3, and pool 3 gets the remaining 7 slots. Pool 4 also had a min share
+ * of 2 slots but this should not affect the outcome.
+ */
+ public void testMinShares() {
+ scheds.add(new FakeSchedulable(100, 20));
+ scheds.add(new FakeSchedulable(10, 20));
+ scheds.add(new FakeSchedulable(10, 0));
+ scheds.add(new FakeSchedulable(3, 2));
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares(20, 10, 7, 3);
+ }
+
+ /**
+ * Basic test for weighted shares with no minimum shares and no low demands.
+ * Each pool should get slots in proportion to its weight.
+ */
+ public void testWeightedSharing() {
+ scheds.add(new FakeSchedulable(100, 0, 2.0));
+ scheds.add(new FakeSchedulable(50, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 0, 1.0));
+ scheds.add(new FakeSchedulable(20, 0, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(20, 10, 10, 5);
+ }
+
+ /**
+ * Weighted sharing test where pools 1 and 2 are now given lower demands than
+ * above. Pool 1 stops at 10 slots, leaving 35. If the remaining pools split
+ * this into a 1:1:0.5 ratio, they would get 14:14:7 slots respectively, but
+ * pool 2's demand is only 11, so it only gets 11. The remaining 2 pools split
+ * the 24 slots left into a 1:0.5 ratio, getting 16 and 8 slots respectively.
+ */
+ public void testWeightedSharingWithLowDemands() {
+ scheds.add(new FakeSchedulable(10, 0, 2.0));
+ scheds.add(new FakeSchedulable(11, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 0, 1.0));
+ scheds.add(new FakeSchedulable(20, 0, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(10, 11, 16, 8);
+ }
+
+ /**
+ * Weighted fair sharing test with min shares. As in the min share test above,
+ * pool 1 has a min share greater than its demand so it only gets its demand.
+ * Pool 3 has a min share of 15 even though its weight is very small, so it
+ * gets 15 slots. The remaining pools share the remaining 20 slots equally,
+ * getting 10 each. Pool 3's min share of 5 slots doesn't affect this.
+ */
+ public void testWeightedSharingWithMinShares() {
+ scheds.add(new FakeSchedulable(10, 20, 2.0));
+ scheds.add(new FakeSchedulable(11, 0, 1.0));
+ scheds.add(new FakeSchedulable(30, 5, 1.0));
+ scheds.add(new FakeSchedulable(20, 15, 0.5));
+ SchedulingAlgorithms.computeFairShares(scheds, 45);
+ verifyShares(10, 10, 10, 15);
+ }
+
+ /**
+ * Test that shares are computed accurately even when there are many more
+ * frameworks than available slots.
+ */
+ public void testSmallShares() {
+ scheds.add(new FakeSchedulable(10));
+ scheds.add(new FakeSchedulable(5));
+ scheds.add(new FakeSchedulable(3));
+ scheds.add(new FakeSchedulable(2));
+ SchedulingAlgorithms.computeFairShares(scheds, 1);
+ verifyShares(0.25, 0.25, 0.25, 0.25);
+ }
+
+ /**
+ * Test that shares are computed accurately even when the number of slots is
+ * very large.
+ */
+ public void testLargeShares() {
+ int million = 1000 * 1000;
+ scheds.add(new FakeSchedulable(100 * million));
+ scheds.add(new FakeSchedulable(50 * million));
+ scheds.add(new FakeSchedulable(30 * million));
+ scheds.add(new FakeSchedulable(20 * million));
+ SchedulingAlgorithms.computeFairShares(scheds, 40 * million);
+ verifyShares(10 * million, 10 * million, 10 * million, 10 * million);
+ }
+
+ /**
+ * Test that having a pool with 0 demand doesn't confuse the algorithm.
+ */
+ public void testZeroDemand() {
+ scheds.add(new FakeSchedulable(100));
+ scheds.add(new FakeSchedulable(50));
+ scheds.add(new FakeSchedulable(30));
+ scheds.add(new FakeSchedulable(0));
+ SchedulingAlgorithms.computeFairShares(scheds, 30);
+ verifyShares(10, 10, 10, 0);
+ }
+
+ /**
+ * Test that being called on an empty list doesn't confuse the algorithm.
+ */
+ public void testEmptyList() {
+ SchedulingAlgorithms.computeFairShares(scheds, 40);
+ verifyShares();
+ }
+
+ /**
+ * Check that a given list of shares have been assigned to this.scheds.
+ */
+ private void verifyShares(double... shares) {
+ assertEquals(scheds.size(), shares.length);
+ for (int i = 0; i < shares.length; i++) {
+ assertEquals(shares[i], scheds.get(i).getFairShare(), 0.01);
+ }
+ }
+}