You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2009/12/18 04:28:07 UTC
svn commit: r892117 - in /hadoop/mapreduce/trunk: ./
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content/xdocs/
Author: matei
Date: Fri Dec 18 03:28:04 2009
New Revision: 892117
URL: http://svn.apache.org/viewvc?rev=892117&view=rev
Log:
MAPREDUCE-698. Per-pool task limits for the fair scheduler. Contributed by Kevin Peterson.
Modified:
hadoop/mapreduce/trunk/CHANGES.txt
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 18 03:28:04 2009
@@ -6,6 +6,9 @@
NEW FEATURES
+ MAPREDUCE-698. Per-pool task limits for the fair scheduler.
+ (Kevin Peterson via matei)
+
MAPREDUCE-1017. Compression and output splitting for Sqoop.
(Aaron Kimball via tomwhite)
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Fri Dec 18 03:28:04 2009
@@ -151,16 +151,17 @@
*/
private void showPools(PrintWriter out, boolean advancedView) {
synchronized(scheduler) {
+ boolean warnInverted = false;
PoolManager poolManager = scheduler.getPoolManager();
out.print("<h2>Pools</h2>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
out.print("<tr><th rowspan=2>Pool</th>" +
"<th rowspan=2>Running Jobs</th>" +
- "<th colspan=3>Map Tasks</th>" +
- "<th colspan=3>Reduce Tasks</th>" +
+ "<th colspan=4>Map Tasks</th>" +
+ "<th colspan=4>Reduce Tasks</th>" +
"<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" +
- "<th>Min Share</th><th>Running</th><th>Fair Share</th>" +
- "<th>Min Share</th><th>Running</th><th>Fair Share</th></tr>\n");
+ "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th>" +
+ "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th></tr>\n");
List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
Collections.sort(pools, new Comparator<Pool>() {
public int compare(Pool p1, Pool p2) {
@@ -174,21 +175,51 @@
String name = pool.getName();
int runningMaps = pool.getMapSchedulable().getRunningTasks();
int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+ int maxMaps = poolManager.getMaxSlots(name, TaskType.MAP);
+ int maxReduces = poolManager.getMaxSlots(name, TaskType.REDUCE);
+ boolean invertedMaps = poolManager.invertedMinMax(TaskType.MAP, name);
+ boolean invertedReduces = poolManager.invertedMinMax(TaskType.REDUCE, name);
+ warnInverted = warnInverted || invertedMaps || invertedReduces;
out.print("<tr>");
out.printf("<td>%s</td>", name);
out.printf("<td>%d</td>", pool.getJobs().size());
+ // Map Tasks
out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.MAP));
+ out.print("<td>");
+ if(maxMaps == Integer.MAX_VALUE) {
+ out.print("-");
+ } else {
+ out.print(maxMaps);
+ }
+ if(invertedMaps) {
+ out.print("*");
+ }
+ out.print("</td>");
out.printf("<td>%d</td>", runningMaps);
out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+ // Reduce Tasks
out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.REDUCE));
+ out.print("<td>");
+ if(maxReduces == Integer.MAX_VALUE) {
+ out.print("-");
+ } else {
+ out.print(maxReduces);
+ }
+ if(invertedReduces) {
+ out.print("*");
+ }
+ out.print("</td>");
out.printf("<td>%d</td>", runningReduces);
out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
out.printf("<td>%s</td>", pool.getSchedulingMode());
out.print("</tr>\n");
}
out.print("</table>\n");
+ if(warnInverted) {
+ out.print("<p>* One or more pools have max share set lower than min share. Max share will be used and minimum will be treated as if set equal to max.</p>");
+ }
}
}
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Dec 18 03:28:04 2009
@@ -69,6 +69,10 @@
private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
+ // If set, cap number of map and reduce tasks in a pool
+ private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+ private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
+
// Sharing weights for each pool
private Map<String, Double> poolWeights = new HashMap<String, Double>();
@@ -221,6 +225,8 @@
Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+ Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+ Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
Map<String, Double> poolWeights = new HashMap<String, Double>();
Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -271,6 +277,14 @@
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
reduceAllocs.put(poolName, val);
+ } else if ("maxMaps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ poolMaxMaps.put(poolName, val);
+ } else if ("maxReduces".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ poolMaxReduces.put(poolName, val);
} else if ("maxRunningJobs".equals(field.getTagName())) {
String text = ((Text)field.getFirstChild()).getData().trim();
int val = Integer.parseInt(text);
@@ -288,6 +302,16 @@
poolModes.put(poolName, parseSchedulingMode(text));
}
}
+ if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName)
+ && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) {
+ LOG.warn(String.format("Pool %s has max maps %d less than min maps %d",
+ poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName)));
+ }
+ if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName)
+ && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) {
+ LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d",
+ poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName)));
+ }
} else if ("user".equals(element.getTagName())) {
String userName = element.getAttribute("name");
NodeList fields = element.getChildNodes();
@@ -331,6 +355,8 @@
synchronized(this) {
this.mapAllocs = mapAllocs;
this.reduceAllocs = reduceAllocs;
+ this.poolMaxMaps = poolMaxMaps;
+ this.poolMaxReduces = poolMaxReduces;
this.poolMaxJobs = poolMaxJobs;
this.userMaxJobs = userMaxJobs;
this.poolWeights = poolWeights;
@@ -351,6 +377,25 @@
}
}
+ /**
+ * Does the pool have incompatible max and min allocation set.
+ *
+ * @param type
+ * {@link TaskType#MAP} or {@link TaskType#REDUCE}
+ * @param pool
+ * the pool name
+ * @return true if the max is less than the min
+ */
+ boolean invertedMinMax(TaskType type, String pool) {
+ Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces;
+ Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs;
+ if (max.containsKey(pool) && min.containsKey(pool)
+ && max.get(pool) < min.get(pool)) {
+ return true;
+ }
+ return false;
+ }
+
private SchedulingMode parseSchedulingMode(String text)
throws AllocationConfigurationException {
text = text.toLowerCase();
@@ -373,7 +418,20 @@
Integer alloc = allocationMap.get(pool);
return (alloc == null ? 0 : alloc);
}
-
+
+ /**
+ * Get the maximum map or reduce slots for the given pool.
+ * @return the cap set on this pool, or Integer.MAX_VALUE if not set.
+ */
+ int getMaxSlots(String poolName, TaskType taskType) {
+ Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces);
+ if (maxMap.containsKey(poolName)) {
+ return maxMap.get(poolName);
+ } else {
+ return Integer.MAX_VALUE;
+ }
+ }
+
/**
* Add a job in the appropriate pool
*/
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Fri Dec 18 03:28:04 2009
@@ -82,6 +82,11 @@
sched.updateDemand();
demand += sched.getDemand();
}
+ // if demand exceeds the cap for this pool, limit to the max
+ int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
+ if(demand > maxTasks) {
+ demand = maxTasks;
+ }
}
/**
@@ -135,6 +140,10 @@
@Override
public Task assignTask(TaskTrackerStatus tts, long currentTime,
Collection<JobInProgress> visited) throws IOException {
+ int runningTasks = getRunningTasks();
+ if (runningTasks >= poolMgr.getMaxSlots(pool.getName(), taskType)) {
+ return null;
+ }
SchedulingMode mode = pool.getSchedulingMode();
Comparator<Schedulable> comparator;
if (mode == SchedulingMode.FIFO) {
Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Dec 18 03:28:04 2009
@@ -1640,6 +1640,45 @@
assertEquals(1.33, info3.reduceSchedulable.getFairShare(), 0.01);
}
+ public void testPoolMaxMapsReduces() throws Exception {
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ // Pool with upper bound
+ out.println("<pool name=\"poolLimited\">");
+ out.println("<weight>1.0</weight>");
+ out.println("<maxMaps>2</maxMaps>");
+ out.println("<maxReduces>1</maxReduces>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+ // Create two jobs with ten maps
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 5, "poolLimited");
+ advanceTime(10);
+ JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 5);
+ checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+ checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+ checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+ checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
+
+ Pool limited = scheduler.getPoolManager().getPool("poolLimited");
+ assertEquals(2, limited.getSchedulable(TaskType.MAP).getRunningTasks());
+ assertEquals(1, limited.getSchedulable(TaskType.REDUCE).getRunningTasks());
+ Pool defaultPool = scheduler.getPoolManager().getPool("default");
+ assertEquals(2, defaultPool.getSchedulable(TaskType.MAP).getRunningTasks());
+ assertEquals(3, defaultPool.getSchedulable(TaskType.REDUCE)
+ .getRunningTasks());
+ assertEquals(2, job1.runningMapTasks);
+ assertEquals(1, job1.runningReduceTasks);
+ assertEquals(2, job2.runningMapTasks);
+ assertEquals(3, job2.runningReduceTasks);
+ }
+
/**
* Tests that max-running-tasks per node are set by assigning load
* equally accross the cluster in CapBasedLoadManager.
Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Fri Dec 18 03:28:04 2009
@@ -79,7 +79,7 @@
jobs tolerate losing tasks; it only makes them take longer to finish.
</p>
<p>
- Finally, the Fair Scheduler can limit the number of concurrent
+ The Fair Scheduler can limit the number of concurrent
running jobs per user and per pool. This can be useful when a
user must submit hundreds of jobs at once, or for ensuring that
intermediate data does not fill up disk space on a cluster when too many
@@ -89,6 +89,13 @@
Jobs to run from each user/pool are chosen in order of priority and then
submit time.
</p>
+ <p>
+ Finally, the Fair Scheduler can limit the number of concurrent
+ running tasks per pool. This can be useful when jobs have a
+ dependency on an external service like a database or web
+ service that could be overloaded if too many map or reduce
+ tasks are run at once.
+ </p>
</section>
<section>
@@ -351,6 +358,8 @@
<ul>
<li><em>minMaps</em> and <em>minReduces</em>,
to set the pool's minimum share of task slots.</li>
+ <li><em>maxMaps</em> and <em>maxReduces</em>, to set the
+ pool's maximum concurrent task slots.</li>
<li><em>schedulingMode</em>, the pool's internal scheduling mode,
which can be <em>fair</em> for fair sharing or <em>fifo</em> for
first-in-first-out.</li>
@@ -398,6 +407,8 @@
<pool name="sample_pool">
<minMaps>5</minMaps>
<minReduces>5</minReduces>
+ <maxMaps>25</maxMaps>
+ <maxReduces>25</maxReduces>
<minSharePreemptionTimeout>300</minSharePreemptionTimeout>
</pool>
<mapreduce.job.mapreduce.job.user.name="sample_user">
@@ -412,7 +423,9 @@
slots and 5 reduce slots. The pool also has a minimum share preemption
timeout of 300 seconds (5 minutes), meaning that if it does not get its
guaranteed share within this time, it is allowed to kill tasks from
- other pools to achieve its share.
+ other pools to achieve its share. The pool has a cap of 25 map and 25
+ reduce slots, which means that once 25 tasks are running, no more will
+ be scheduled even if the pool's fair share is higher.
The example also limits the number of running jobs
per user to 3, except for sample_user, who can run 6 jobs concurrently.
Finally, the example sets a fair share preemption timeout of 600 seconds