You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by ma...@apache.org on 2009/12/18 04:28:07 UTC

svn commit: r892117 - in /hadoop/mapreduce/trunk: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content/xdocs/

Author: matei
Date: Fri Dec 18 03:28:04 2009
New Revision: 892117

URL: http://svn.apache.org/viewvc?rev=892117&view=rev
Log:
MAPREDUCE-698. Per-pool task limits for the fair scheduler. Contributed by Kevin Peterson.


Modified:
    hadoop/mapreduce/trunk/CHANGES.txt
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
    hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml

Modified: hadoop/mapreduce/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/CHANGES.txt?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/CHANGES.txt (original)
+++ hadoop/mapreduce/trunk/CHANGES.txt Fri Dec 18 03:28:04 2009
@@ -6,6 +6,9 @@
 
   NEW FEATURES
 
+    MAPREDUCE-698. Per-pool task limits for the fair scheduler.
+    (Kevin Peterson via matei)
+
     MAPREDUCE-1017. Compression and output splitting for Sqoop.
     (Aaron Kimball via tomwhite)
 

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Fri Dec 18 03:28:04 2009
@@ -151,16 +151,17 @@
    */
   private void showPools(PrintWriter out, boolean advancedView) {
     synchronized(scheduler) {
+      boolean warnInverted = false;
       PoolManager poolManager = scheduler.getPoolManager();
       out.print("<h2>Pools</h2>\n");
       out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
       out.print("<tr><th rowspan=2>Pool</th>" +
           "<th rowspan=2>Running Jobs</th>" + 
-          "<th colspan=3>Map Tasks</th>" + 
-          "<th colspan=3>Reduce Tasks</th>" +
+          "<th colspan=4>Map Tasks</th>" + 
+          "<th colspan=4>Reduce Tasks</th>" +
           "<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" + 
-          "<th>Min Share</th><th>Running</th><th>Fair Share</th>" + 
-          "<th>Min Share</th><th>Running</th><th>Fair Share</th></tr>\n");
+          "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th>" + 
+          "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th></tr>\n");
       List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
       Collections.sort(pools, new Comparator<Pool>() {
         public int compare(Pool p1, Pool p2) {
@@ -174,21 +175,51 @@
         String name = pool.getName();
         int runningMaps = pool.getMapSchedulable().getRunningTasks();
         int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+        int maxMaps = poolManager.getMaxSlots(name, TaskType.MAP);
+        int maxReduces = poolManager.getMaxSlots(name, TaskType.REDUCE);
+        boolean invertedMaps = poolManager.invertedMinMax(TaskType.MAP, name);
+        boolean invertedReduces = poolManager.invertedMinMax(TaskType.REDUCE, name);
+        warnInverted = warnInverted || invertedMaps || invertedReduces;
         out.print("<tr>");
         out.printf("<td>%s</td>", name);
         out.printf("<td>%d</td>", pool.getJobs().size());
+        // Map Tasks
         out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.MAP));
+        out.print("<td>");
+        if(maxMaps == Integer.MAX_VALUE) {
+          out.print("-");
+        } else {
+          out.print(maxMaps);
+        }
+        if(invertedMaps) {
+          out.print("*");
+        }
+        out.print("</td>");
         out.printf("<td>%d</td>", runningMaps);
         out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+        // Reduce Tasks
         out.printf("<td>%d</td>", poolManager.getAllocation(name,
             TaskType.REDUCE));
+        out.print("<td>");
+        if(maxReduces == Integer.MAX_VALUE) {
+          out.print("-");
+        } else {
+          out.print(maxReduces);
+        }
+        if(invertedReduces) {
+          out.print("*");
+        }
+        out.print("</td>");
         out.printf("<td>%d</td>", runningReduces);
         out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
         out.printf("<td>%s</td>", pool.getSchedulingMode());
         out.print("</tr>\n");
       }
       out.print("</table>\n");
+      if(warnInverted) {
+        out.print("<p>* One or more pools have max share set lower than min share. Max share will be used and minimum will be treated as if set equal to max.</p>");
+      }
     }
   }
 

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Dec 18 03:28:04 2009
@@ -69,6 +69,10 @@
   private Map<String, Integer> mapAllocs = new HashMap<String, Integer>();
   private Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
 
+  // If set, cap number of map and reduce tasks in a pool
+  private Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+  private Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
+
   // Sharing weights for each pool
   private Map<String, Double> poolWeights = new HashMap<String, Double>();
   
@@ -221,6 +225,8 @@
     Map<String, Integer> reduceAllocs = new HashMap<String, Integer>();
     Map<String, Integer> poolMaxJobs = new HashMap<String, Integer>();
     Map<String, Integer> userMaxJobs = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxMaps = new HashMap<String, Integer>();
+    Map<String, Integer> poolMaxReduces = new HashMap<String, Integer>();
     Map<String, Double> poolWeights = new HashMap<String, Double>();
     Map<String, SchedulingMode> poolModes = new HashMap<String, SchedulingMode>();
     Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
@@ -271,6 +277,14 @@
             String text = ((Text)field.getFirstChild()).getData().trim();
             int val = Integer.parseInt(text);
             reduceAllocs.put(poolName, val);
+          } else if ("maxMaps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxMaps.put(poolName, val);
+          } else if ("maxReduces".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            poolMaxReduces.put(poolName, val);
           } else if ("maxRunningJobs".equals(field.getTagName())) {
             String text = ((Text)field.getFirstChild()).getData().trim();
             int val = Integer.parseInt(text);
@@ -288,6 +302,16 @@
             poolModes.put(poolName, parseSchedulingMode(text));
           }
         }
+        if (poolMaxMaps.containsKey(poolName) && mapAllocs.containsKey(poolName)
+            && poolMaxMaps.get(poolName) < mapAllocs.get(poolName)) {
+          LOG.warn(String.format("Pool %s has max maps %d less than min maps %d",
+              poolName, poolMaxMaps.get(poolName), mapAllocs.get(poolName)));        
+        }
+        if(poolMaxReduces.containsKey(poolName) && reduceAllocs.containsKey(poolName)
+            && poolMaxReduces.get(poolName) < reduceAllocs.get(poolName)) {
+          LOG.warn(String.format("Pool %s has max reduces %d less than min reduces %d",
+              poolName, poolMaxReduces.get(poolName), reduceAllocs.get(poolName)));        
+        }
       } else if ("user".equals(element.getTagName())) {
         String userName = element.getAttribute("name");
         NodeList fields = element.getChildNodes();
@@ -331,6 +355,8 @@
     synchronized(this) {
       this.mapAllocs = mapAllocs;
       this.reduceAllocs = reduceAllocs;
+      this.poolMaxMaps = poolMaxMaps;
+      this.poolMaxReduces = poolMaxReduces;
       this.poolMaxJobs = poolMaxJobs;
       this.userMaxJobs = userMaxJobs;
       this.poolWeights = poolWeights;
@@ -351,6 +377,25 @@
     }
   }
 
+  /**
+   * Does the pool have incompatible max and min allocation set.
+   * 
+   * @param type
+   *          {@link TaskType#MAP} or {@link TaskType#REDUCE}
+   * @param pool
+   *          the pool name
+   * @return true if the max is less than the min
+   */
+  boolean invertedMinMax(TaskType type, String pool) {
+    Map<String, Integer> max = TaskType.MAP == type ? poolMaxMaps : poolMaxReduces;
+    Map<String, Integer> min = TaskType.MAP == type ? mapAllocs : reduceAllocs;
+    if (max.containsKey(pool) && min.containsKey(pool)
+        && max.get(pool) < min.get(pool)) {
+      return true;
+    }
+    return false;
+  }
+
   private SchedulingMode parseSchedulingMode(String text)
       throws AllocationConfigurationException {
     text = text.toLowerCase();
@@ -373,7 +418,20 @@
     Integer alloc = allocationMap.get(pool);
     return (alloc == null ? 0 : alloc);
   }
-  
+
+  /**
+   * Get the maximum map or reduce slots for the given pool.
+   * @return the cap set on this pool, or Integer.MAX_VALUE if not set.
+   */
+  int getMaxSlots(String poolName, TaskType taskType) {
+    Map<String, Integer> maxMap = (taskType == TaskType.MAP ? poolMaxMaps : poolMaxReduces);
+    if (maxMap.containsKey(poolName)) {
+      return maxMap.get(poolName);
+    } else {
+      return Integer.MAX_VALUE;
+    }
+  }
+ 
   /**
    * Add a job in the appropriate pool
    */

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Fri Dec 18 03:28:04 2009
@@ -82,6 +82,11 @@
       sched.updateDemand();
       demand += sched.getDemand();
     }
+    // if demand exceeds the cap for this pool, limit to the max
+    int maxTasks = poolMgr.getMaxSlots(pool.getName(), taskType);
+    if(demand > maxTasks) {
+      demand = maxTasks;
+    }
   }
   
   /**
@@ -135,6 +140,10 @@
   @Override
   public Task assignTask(TaskTrackerStatus tts, long currentTime,
       Collection<JobInProgress> visited) throws IOException {
+    int runningTasks = getRunningTasks();
+    if (runningTasks >= poolMgr.getMaxSlots(pool.getName(), taskType)) {
+      return null;
+    }
     SchedulingMode mode = pool.getSchedulingMode();
     Comparator<Schedulable> comparator;
     if (mode == SchedulingMode.FIFO) {

Modified: hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/mapreduce/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Fri Dec 18 03:28:04 2009
@@ -1640,6 +1640,45 @@
     assertEquals(1.33,  info3.reduceSchedulable.getFairShare(), 0.01);
   }
 
+  public void testPoolMaxMapsReduces() throws Exception {
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    // Pool with upper bound
+    out.println("<pool name=\"poolLimited\">");
+    out.println("<weight>1.0</weight>");
+    out.println("<maxMaps>2</maxMaps>");
+    out.println("<maxReduces>1</maxReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    // Create two jobs with ten maps
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 10, 5, "poolLimited");
+    advanceTime(10);
+    JobInProgress job2 = submitJob(JobStatus.RUNNING, 10, 5);
+    checkAssignment("tt1", "attempt_test_0002_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0002_r_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
+    checkAssignment("tt1", "attempt_test_0001_r_000000_0 on tt1");
+    checkAssignment("tt2", "attempt_test_0002_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0001_m_000001_0 on tt2");
+    checkAssignment("tt2", "attempt_test_0002_r_000002_0 on tt2");
+
+    Pool limited = scheduler.getPoolManager().getPool("poolLimited");
+    assertEquals(2, limited.getSchedulable(TaskType.MAP).getRunningTasks());
+    assertEquals(1, limited.getSchedulable(TaskType.REDUCE).getRunningTasks());
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    assertEquals(2, defaultPool.getSchedulable(TaskType.MAP).getRunningTasks());
+    assertEquals(3, defaultPool.getSchedulable(TaskType.REDUCE)
+        .getRunningTasks());
+    assertEquals(2, job1.runningMapTasks);
+    assertEquals(1, job1.runningReduceTasks);
+    assertEquals(2, job2.runningMapTasks);
+    assertEquals(3, job2.runningReduceTasks);
+  }
+
   /**
    * Tests that max-running-tasks per node are set by assigning load
    * equally accross the cluster in CapBasedLoadManager.

Modified: hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=892117&r1=892116&r2=892117&view=diff
==============================================================================
--- hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/mapreduce/trunk/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Fri Dec 18 03:28:04 2009
@@ -79,7 +79,7 @@
         jobs tolerate losing tasks; it only makes them take longer to finish.
       </p>
       <p>
-        Finally, the Fair Scheduler can limit the number of concurrent
+        The Fair Scheduler can limit the number of concurrent
         running jobs per user and per pool. This can be useful when a 
         user must submit hundreds of jobs at once, or for ensuring that
         intermediate data does not fill up disk space on a cluster when too many
@@ -89,6 +89,13 @@
         Jobs to run from each user/pool are chosen in order of priority and then
         submit time.
       </p>
+      <p>
+        Finally, the Fair Scheduler can limit the number of concurrent
+        running tasks per pool. This can be useful when jobs have a
+        dependency on an external service like a database or web
+        service that could be overloaded if too many map or reduce
+        tasks are run at once.
+      </p>
     </section>
 
     <section>
@@ -351,6 +358,8 @@
           <ul>
           <li><em>minMaps</em> and <em>minReduces</em>,
             to set the pool's minimum share of task slots.</li>
+          <li><em>maxMaps</em> and <em>maxReduces</em>, to set the
+            pool's maximum concurrent task slots.</li>
           <li><em>schedulingMode</em>, the pool's internal scheduling mode,
           which can be <em>fair</em> for fair sharing or <em>fifo</em> for
           first-in-first-out.</li>
@@ -398,6 +407,8 @@
   &lt;pool name="sample_pool"&gt;
     &lt;minMaps&gt;5&lt;/minMaps&gt;
     &lt;minReduces&gt;5&lt;/minReduces&gt;
+    &lt;maxMaps&gt;25&lt;/maxMaps&gt;
+    &lt;maxReduces&gt;25&lt;/maxReduces&gt;
     &lt;minSharePreemptionTimeout&gt;300&lt;/minSharePreemptionTimeout&gt;
   &lt;/pool&gt;
   &lt;mapreduce.job.mapreduce.job.user.name="sample_user"&gt;
@@ -412,7 +423,9 @@
         slots and 5 reduce slots. The pool also has a minimum share preemption
         timeout of 300 seconds (5 minutes), meaning that if it does not get its
         guaranteed share within this time, it is allowed to kill tasks from
-        other pools to achieve its share.
+        other pools to achieve its share. The pool has a cap of 25 map and 25
+        reduce slots, which means that once 25 tasks are running, no more will
+        be scheduled even if the pool's fair share is higher.
         The example also limits the number of running jobs 
         per user to 3, except for sample_user, who can run 6 jobs concurrently. 
         Finally, the example sets a fair share preemption timeout of 600 seconds