You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/07/06 07:11:42 UTC

svn commit: r1143252 - in /hadoop/common/trunk/mapreduce: ./ src/contrib/ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content/xdocs/

Author: todd
Date: Wed Jul  6 05:11:42 2011
New Revision: 1143252

URL: http://svn.apache.org/viewvc?rev=1143252&view=rev
Log:
MAPREDUCE-2323. Add metrics to the fair scheduler. Contributed by Todd Lipcon.

Modified:
    hadoop/common/trunk/mapreduce/CHANGES.txt
    hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
    hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
    hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml

Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Wed Jul  6 05:11:42 2011
@@ -31,6 +31,8 @@ Trunk (unreleased changes)
     deployment layout to be consistent across the binary tgz, rpm, and deb.
     (Eric Yang via omalley)
 
+    MAPREDUCE-2323. Add metrics to the fair scheduler. (todd)
+
   IMPROVEMENTS
 
     MAPREDUCE-2563. [Gridmix]  Add High-Ram emulation system tests to 

Modified: hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml Wed Jul  6 05:11:42 2011
@@ -307,6 +307,7 @@
       errorProperty="tests.failed" failureProperty="tests.failed"
       timeout="${test.timeout}">
       
+      <assertions><enable/></assertions>
       <sysproperty key="test.build.data" value="${build.test}/data"/>
       <sysproperty key="build.test" value="${build.test}"/>
       <sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Jul  6 05:11:42 2011
@@ -38,6 +38,9 @@ import org.apache.hadoop.conf.Configurat
 import org.apache.hadoop.http.HttpServer;
 import org.apache.hadoop.mapreduce.TaskType;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.util.ReflectionUtils;
 
 /**
@@ -71,6 +74,7 @@ public class FairScheduler extends TaskS
   protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
     new HashMap<JobInProgress, JobInfo>();
   protected long lastUpdateTime;           // Time when we last updated infos
+  protected long lastPreemptionUpdateTime; // Time when we last updated preemption vars
   protected boolean initialized;  // Are we initialized?
   protected volatile boolean running; // Are we running?
   protected boolean assignMultiple; // Simultaneously assign map and reduce?
@@ -210,6 +214,9 @@ public class FairScheduler extends TaskS
         infoServer.addServlet("scheduler", "/scheduler",
             FairSchedulerServlet.class);
       }
+      
+      initMetrics();
+      
       eventLog.log("INITIALIZED");
     } catch (Exception e) {
       // Can't load one of the managers - crash the JobTracker now while it is
@@ -219,6 +226,8 @@ public class FairScheduler extends TaskS
     LOG.info("Successfully configured FairScheduler");
   }
 
+  private MetricsUpdater metricsUpdater; // responsible for pushing hadoop metrics
+
   /**
    * Returns the LoadManager object used by the Fair Share scheduler
    */
@@ -226,6 +235,16 @@ public class FairScheduler extends TaskS
     return loadMgr;
   }
 
+  /**
+   * Register metrics for the fair scheduler, and start a thread
+   * to update them periodically.
+   */
+  private void initMetrics() {
+    MetricsContext context = MetricsUtil.getContext("fairscheduler");
+    metricsUpdater = new MetricsUpdater();
+    context.registerUpdater(metricsUpdater);
+  }
+
   @Override
   public void terminate() throws IOException {
     if (eventLog != null)
@@ -236,6 +255,11 @@ public class FairScheduler extends TaskS
       taskTrackerManager.removeJobInProgressListener(jobListener);
     if (eventLog != null)
       eventLog.shutdown();
+    if (metricsUpdater != null) {
+      MetricsContext context = MetricsUtil.getContext("fairscheduler");
+      context.unregisterUpdater(metricsUpdater);
+      metricsUpdater = null;
+    }
   }
  
 
@@ -298,8 +322,7 @@ public class FairScheduler extends TaskS
     public void jobRemoved(JobInProgress job) {
       synchronized (FairScheduler.this) {
         eventLog.log("JOB_REMOVED", job.getJobID());
-        poolMgr.removeJob(job);
-        infos.remove(job);
+        jobNoLongerRunning(job);
       }
     }
   
@@ -331,6 +354,20 @@ public class FairScheduler extends TaskS
       }
     }
   }
+
+  /**
+   * Responsible for updating metrics when the metrics context requests it.
+   */
+  private class MetricsUpdater implements Updater {
+    @Override
+    public void doUpdates(MetricsContext context) {
+      updateMetrics();
+    }    
+  }
+  
+  synchronized void updateMetrics() {
+    poolMgr.updateMetrics();
+  }
   
   @Override
   public synchronized List<Task> assignTasks(TaskTracker tracker)
@@ -617,8 +654,7 @@ public class FairScheduler extends TaskS
         }
       }
       for (JobInProgress job: toRemove) {
-        infos.remove(job);
-        poolMgr.removeJob(job);
+        jobNoLongerRunning(job);
       }
       
       updateRunnability(); // Set job runnability based on user/pool limits 
@@ -647,6 +683,16 @@ public class FairScheduler extends TaskS
         updatePreemptionVariables();
     }
   }
+
+  private void jobNoLongerRunning(JobInProgress job) {
+    assert Thread.holdsLock(this);
+    JobInfo info = infos.remove(job);
+    if (info != null) {
+      info.mapSchedulable.cleanupMetrics();
+      info.reduceSchedulable.cleanupMetrics();
+    }
+    poolMgr.removeJob(job);
+  }
   
   public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
     List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
@@ -744,6 +790,7 @@ public class FairScheduler extends TaskS
    */
   private void updatePreemptionVariables() {
     long now = clock.getTime();
+    lastPreemptionUpdateTime = now;
     for (TaskType type: MAP_AND_REDUCE) {
       for (PoolSchedulable sched: getPoolSchedulables(type)) {
         if (!isStarvedForMinShare(sched)) {
@@ -1044,4 +1091,11 @@ public class FairScheduler extends TaskS
   public JobInfo getJobInfo(JobInProgress job) {
     return infos.get(job);
   }
+  
+  boolean isPreemptionEnabled() {
+    return preemptionEnabled;
+  }
+  long getLastPreemptionUpdateTime() {
+    return lastPreemptionUpdateTime;
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Wed Jul  6 05:11:42 2011
@@ -35,6 +35,13 @@ public class JobSchedulable extends Sche
     this.scheduler = scheduler;
     this.job = job;
     this.taskType = taskType;
+    
+    initMetrics();
+  }
+  
+  @Override
+  public TaskType getTaskType() {
+    return taskType;
   }
   
   @Override
@@ -151,4 +158,18 @@ public class JobSchedulable extends Sche
       return null;
     }
   }
+
+  
+  @Override
+  protected String getMetricsContextName() {
+    return "jobs";
+  }
+  
+  @Override
+  void updateMetrics() {
+    assert metrics != null;
+    
+    super.setMetricValues(metrics);
+    metrics.update();
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Wed Jul  6 05:11:42 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Collection;
 
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
 
 /**
  * A schedulable pool of jobs.
@@ -91,4 +92,9 @@ public class Pool {
   public PoolSchedulable getSchedulable(TaskType type) {
     return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
   }
+
+  public void updateMetrics() {
+    mapSchedulable.updateMetrics();
+    reduceSchedulable.updateMetrics();
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Wed Jul  6 05:11:42 2011
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
 import org.w3c.dom.Document;
 import org.w3c.dom.Element;
 import org.w3c.dom.Node;
@@ -536,4 +537,10 @@ public class PoolManager {
   public long getFairSharePreemptionTimeout() {
     return fairSharePreemptionTimeout;
   }
+
+  synchronized void updateMetrics() {
+    for (Pool pool : pools.values()) {
+      pool.updateMetrics();
+    }
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Wed Jul  6 05:11:42 2011
@@ -54,6 +54,8 @@ public class PoolSchedulable extends Sch
     long currentTime = scheduler.getClock().getTime();
     this.lastTimeAtMinShare = currentTime;
     this.lastTimeAtHalfFairShare = currentTime;
+    
+    initMetrics();
   }
 
   public void addJob(JobInProgress job) {
@@ -171,6 +173,7 @@ public class PoolSchedulable extends Sch
     return pool;
   }
 
+  @Override
   public TaskType getTaskType() {
     return taskType;
   }
@@ -194,4 +197,25 @@ public class PoolSchedulable extends Sch
   public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
     this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
   }
+
+  protected String getMetricsContextName() {
+    return "pools";
+  }
+  
+  @Override
+  public void updateMetrics() {
+    super.setMetricValues(metrics);
+    
+    if (scheduler.isPreemptionEnabled()) {
+      // These won't be set if preemption is off
+      long lastCheck = scheduler.getLastPreemptionUpdateTime();
+      metrics.setMetric("millisSinceAtMinShare", lastCheck - lastTimeAtMinShare);
+      metrics.setMetric("millisSinceAtHalfFairShare", lastCheck - lastTimeAtHalfFairShare);
+    }
+    metrics.update();
+
+    for (JobSchedulable job : jobScheds) {
+      job.updateMetrics();
+    }
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Wed Jul  6 05:11:42 2011
@@ -21,6 +21,11 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+
 /**
  * A Schedulable represents an entity that can launch tasks, such as a job
  * or a pool. It provides a common interface so that algorithms such as fair
@@ -53,7 +58,8 @@ import java.util.Collection;
 abstract class Schedulable {
   /** Fair share assigned to this Schedulable */
   private double fairShare = 0;
-
+  protected MetricsRecord metrics;
+  
   /**
    * Name of job/pool, used for debugging as well as for breaking ties in
    * scheduling order deterministically. 
@@ -61,6 +67,11 @@ abstract class Schedulable {
   public abstract String getName();
   
   /**
+   * @return the type of tasks that this pool schedules
+   */
+  public abstract TaskType getTaskType();
+  
+  /**
    * Maximum number of tasks required by this Schedulable. This is defined as
    * number of currently running tasks + number of unlaunched tasks (tasks that
    * are either not yet launched or need to be speculated).
@@ -122,6 +133,35 @@ abstract class Schedulable {
     return fairShare;
   }
   
+  /** Return the name of the metrics context for this schedulable */
+  protected abstract String getMetricsContextName();
+  
+  /**
+   * Set up metrics context
+   */
+  protected void initMetrics() {
+    MetricsContext metricsContext = MetricsUtil.getContext("fairscheduler");
+    this.metrics = MetricsUtil.createRecord(metricsContext,
+        getMetricsContextName());
+    metrics.setTag("name", getName());
+    metrics.setTag("taskType", getTaskType().toString());
+  }
+
+  void cleanupMetrics() {
+    metrics.remove();
+    metrics = null;
+  }
+
+  protected void setMetricValues(MetricsRecord metrics) {
+    metrics.setMetric("fairShare", (float)getFairShare());
+    metrics.setMetric("minShare", getMinShare());
+    metrics.setMetric("demand", getDemand());
+    metrics.setMetric("weight", (float)getWeight());
+    metrics.setMetric("runningTasks", getRunningTasks());
+  }
+  
+  abstract void updateMetrics();
+  
   /** Convenient toString implementation for debugging. */
   @Override
   public String toString() {

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Wed Jul  6 05:11:42 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
 import java.io.IOException;
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.TaskType;
+
 /**
  * Dummy implementation of Schedulable for unit testing.
  */
@@ -105,4 +107,18 @@ public class FakeSchedulable extends Sch
 
   @Override
   public void updateDemand() {}
+
+  @Override
+  public TaskType getTaskType() {
+    return TaskType.MAP;
+  }
+
+  @Override
+  protected String getMetricsContextName() {
+    return "fake";
+  }
+
+  @Override
+  void updateMetrics() {
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Wed Jul  6 05:11:42 2011
@@ -47,6 +47,11 @@ import org.apache.hadoop.mapreduce.TaskT
 import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
 import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
 import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
 import org.apache.hadoop.net.Node;
 import org.mortbay.log.Log;
 
@@ -516,7 +521,10 @@ public class TestFairScheduler extends T
   }
 
   private void setUpCluster(int numRacks, int numNodesPerRack,
-      boolean assignMultiple) {
+      boolean assignMultiple) throws IOException {
+    
+    resetMetrics();
+    
     conf = new JobConf();
     conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
     conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
@@ -534,6 +542,20 @@ public class TestFairScheduler extends T
     scheduler.start();
   }
   
+  /**
+   * Set up a metrics context that doesn't emit anywhere but stores the data
+   * so we can verify it. Also clears it of any data so that different test
+   * cases don't pollute each other.
+   */
+  private void resetMetrics() throws IOException {
+    ContextFactory factory = ContextFactory.getFactory();
+    factory.setAttribute("fairscheduler.class",
+        NoEmitMetricsContext.class.getName());
+    
+    MetricsUtil.getContext("fairscheduler").createRecord("jobs").remove();
+    MetricsUtil.getContext("fairscheduler").createRecord("pools").remove();
+  }
+
   @Override
   protected void tearDown() throws Exception {
     if (scheduler != null) {
@@ -689,7 +711,8 @@ public class TestFairScheduler extends T
     assertEquals(1,    info1.reduceSchedulable.getDemand());
     assertEquals(2.0,  info1.mapSchedulable.getFairShare());
     assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
-    
+    verifyMetrics();
+
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
     advanceTime(100);
@@ -709,6 +732,7 @@ public class TestFairScheduler extends T
     assertEquals(2,    info2.reduceSchedulable.getDemand());
     assertEquals(1.0,  info2.mapSchedulable.getFairShare());
     assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    verifyMetrics();
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
@@ -729,8 +753,8 @@ public class TestFairScheduler extends T
     assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
     assertEquals(1, info2.mapSchedulable.getDemand());
     assertEquals(2, info2.reduceSchedulable.getDemand());
+    verifyMetrics();
   }
-  
   /**
    * This test is identical to testSmallJobs but sets assignMultiple to
    * true so that multiple tasks can be assigned per heartbeat.
@@ -748,6 +772,7 @@ public class TestFairScheduler extends T
     assertEquals(1,    info1.reduceSchedulable.getDemand());
     assertEquals(2.0,  info1.mapSchedulable.getFairShare());
     assertEquals(1.0,  info1.reduceSchedulable.getFairShare());
+    verifyMetrics();
     
     // Advance time before submitting another job j2, to make j1 run before j2
     // deterministically.
@@ -768,6 +793,7 @@ public class TestFairScheduler extends T
     assertEquals(2,    info2.reduceSchedulable.getDemand());
     assertEquals(1.0,  info2.mapSchedulable.getFairShare());
     assertEquals(2.0,  info2.reduceSchedulable.getFairShare());
+    verifyMetrics();
     
     // Assign tasks and check that jobs alternate in filling slots
     checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
@@ -788,6 +814,7 @@ public class TestFairScheduler extends T
     assertEquals(2,  info2.reduceSchedulable.getRunningTasks());
     assertEquals(1, info2.mapSchedulable.getDemand());
     assertEquals(2, info2.reduceSchedulable.getDemand());
+    verifyMetrics();
   }
   
   /**
@@ -1632,6 +1659,7 @@ public class TestFairScheduler extends T
     assertEquals(0.28,  info3.reduceSchedulable.getFairShare(), 0.01);
     assertEquals(0.28,  info4.mapSchedulable.getFairShare(), 0.01);
     assertEquals(0.28,  info4.reduceSchedulable.getFairShare(), 0.01);
+    verifyMetrics();    
   }
 
   /**
@@ -2730,6 +2758,65 @@ public class TestFairScheduler extends T
     assertEquals(33,   poolA.getMapSchedulable().getDemand());
     assertEquals(39,   poolA.getReduceSchedulable().getDemand());
   }
+
+
+  /**
+   * Test switching a job from one pool to another, then back to the original
+   * one. This is a regression test for a bug seen during development of
+   * MAPREDUCE-2323 (fair scheduler metrics).
+   */
+  public void testSetPoolTwice() throws Exception {
+    // Set up pools file
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"default\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<schedulingMode>fair</schedulingMode>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+    scheduler.getPoolManager().reloadAllocs();
+    Pool defaultPool = scheduler.getPoolManager().getPool("default");
+    Pool poolA = scheduler.getPoolManager().getPool("poolA");
+
+    // Submit a job to the default pool.  All specifications take default values.
+    JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+
+    // Move job to poolA and make sure demand moves with it
+    scheduler.getPoolManager().setPool(job1, "poolA");
+    assertEquals("poolA", scheduler.getPoolManager().getPoolName(job1));
+
+    defaultPool.getMapSchedulable().updateDemand();
+    defaultPool.getReduceSchedulable().updateDemand();
+    poolA.getMapSchedulable().updateDemand();
+    poolA.getReduceSchedulable().updateDemand();
+
+    assertEquals(0,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(0,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(1,    poolA.getMapSchedulable().getDemand());
+    assertEquals(3,    poolA.getReduceSchedulable().getDemand());
+
+    // Move back to default pool and make sure demand goes back
+    scheduler.getPoolManager().setPool(job1, "default");
+    assertEquals("default", scheduler.getPoolManager().getPoolName(job1));
+
+    defaultPool.getMapSchedulable().updateDemand();
+    defaultPool.getReduceSchedulable().updateDemand();
+    poolA.getMapSchedulable().updateDemand();
+    poolA.getReduceSchedulable().updateDemand();
+
+    assertEquals(1,    defaultPool.getMapSchedulable().getDemand());
+    assertEquals(3,    defaultPool.getReduceSchedulable().getDemand());
+    assertEquals(0,    poolA.getMapSchedulable().getDemand());
+    assertEquals(0,    poolA.getReduceSchedulable().getDemand());
+  }
   
   private void advanceTime(long time) {
     clock.advance(time);
@@ -2828,4 +2915,108 @@ public class TestFairScheduler extends T
     assertNull(scheduler.assignTasks(tracker("tt1")));
     assertNull(scheduler.assignTasks(tracker("tt2")));
   }
+  
+  
+  /**
+   * Ask scheduler to update metrics and then verify that they're all
+   * correctly published to the metrics context
+   */
+  private void verifyMetrics() {
+    scheduler.updateMetrics();
+    verifyPoolMetrics();
+    verifyJobMetrics();
+  }
+  
+  /**
+   * Verify that pool-level metrics match internal data
+   */
+  private void verifyPoolMetrics() {
+    MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
+    Collection<OutputRecord> records = ctx.getAllRecords().get("pools");
+
+    try {
+      assertEquals(scheduler.getPoolSchedulables(TaskType.MAP).size() * 2,
+          records.size());
+    } catch (Error e) {
+      for (OutputRecord rec : records) {
+        System.err.println("record:");
+        System.err.println(" name: " + rec.getTag("name"));
+        System.err.println(" type: " + rec.getTag("type"));
+      }
+
+      throw e;
+    }
+    
+    Map<String, OutputRecord> byPoolAndType =
+      new HashMap<String, OutputRecord>();
+    for (OutputRecord rec : records) {
+      String pool = (String)rec.getTag("name");
+      String type = (String)rec.getTag("taskType");
+      assertNotNull(pool);
+      assertNotNull(type);
+      byPoolAndType.put(pool + "_" + type, rec);
+    }
+    
+    List<PoolSchedulable> poolScheds = new ArrayList<PoolSchedulable>();
+    poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.MAP));
+    poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.REDUCE));
+    
+    for (PoolSchedulable pool : poolScheds) {
+      String poolName = pool.getName();
+      OutputRecord metrics = byPoolAndType.get(
+          poolName + "_" + pool.getTaskType().toString());
+      assertNotNull("Need metrics for " + pool, metrics);
+      
+      verifySchedulableMetrics(pool, metrics);
+    }
+    
+  }
+  
+  /**
+   * Verify that the job-level metrics match internal data
+   */
+  private void verifyJobMetrics() {
+    MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
+    Collection<OutputRecord> records = ctx.getAllRecords().get("jobs");
+    
+    System.out.println("Checking job metrics...");
+    Map<String, OutputRecord> byJobIdAndType =
+      new HashMap<String, OutputRecord>();
+    for (OutputRecord rec : records) {
+      String jobId = (String)rec.getTag("name");
+      String type = (String)rec.getTag("taskType");
+      assertNotNull(jobId);
+      assertNotNull(type);
+      byJobIdAndType.put(jobId + "_" + type, rec);
+      System.out.println("Got " + type + " metrics for job: " + jobId);
+    }
+    assertEquals(scheduler.infos.size() * 2, byJobIdAndType.size());
+    
+    for (Map.Entry<JobInProgress, JobInfo> entry :
+            scheduler.infos.entrySet()) {
+      JobInfo info = entry.getValue();
+      String jobId = entry.getKey().getJobID().toString();
+      
+      OutputRecord mapMetrics = byJobIdAndType.get(jobId + "_MAP");
+      assertNotNull("Job " + jobId + " should have map metrics", mapMetrics);
+      verifySchedulableMetrics(info.mapSchedulable, mapMetrics);
+      
+      OutputRecord reduceMetrics = byJobIdAndType.get(jobId + "_REDUCE");
+      assertNotNull("Job " + jobId + " should have reduce metrics", reduceMetrics);
+      verifySchedulableMetrics(info.reduceSchedulable, reduceMetrics);
+    }
+  }
+
+  /**
+   * Verify that the metrics for a given Schedulable are correct
+   */
+  private void verifySchedulableMetrics(
+      Schedulable sched, OutputRecord metrics) {
+    assertEquals(sched.getRunningTasks(), metrics.getMetric("runningTasks"));
+    assertEquals(sched.getDemand(), metrics.getMetric("demand"));
+    assertEquals(sched.getFairShare(),
+        metrics.getMetric("fairShare").doubleValue(), .001);
+    assertEquals(sched.getWeight(),
+        metrics.getMetric("weight").doubleValue(), .001);
+  }
 }

Modified: hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Wed Jul  6 05:11:42 2011
@@ -517,6 +517,24 @@
      <em>NewJobWeightBooster</em> are enabled.</li>
      </ul>
     </section>
+    <section>
+      <title>Metrics</title>
+      <p>
+        The fair scheduler can export metrics using the Hadoop metrics interface.
+        This can be enabled by adding an entry to <code>hadoop-metrics.properties</code>
+        to enable the <code>fairscheduler</code> metrics context. For example, to
+        simply retain the metrics in memory so they may be viewed in the <code>/metrics</code>
+        servlet:
+      </p>
+      <p>
+        <code>fairscheduler.class=org.apache.hadoop.metrics.spi.NoEmitMetricsContext</code>
+      </p>
+      <p>
+        Metrics are generated for each pool and job, and contain the same information that
+        is visible on the <code>/scheduler</code> web page.
+      </p>
+    </section>
+
     <!--
     <section>
     <title>Implementation</title>