You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by to...@apache.org on 2011/07/06 07:11:42 UTC
svn commit: r1143252 - in /hadoop/common/trunk/mapreduce: ./ src/contrib/
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content/xdocs/
Author: todd
Date: Wed Jul 6 05:11:42 2011
New Revision: 1143252
URL: http://svn.apache.org/viewvc?rev=1143252&view=rev
Log:
MAPREDUCE-2323. Add metrics to the fair scheduler. Contributed by Todd Lipcon.
Modified:
hadoop/common/trunk/mapreduce/CHANGES.txt
hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
Modified: hadoop/common/trunk/mapreduce/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/CHANGES.txt?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/CHANGES.txt (original)
+++ hadoop/common/trunk/mapreduce/CHANGES.txt Wed Jul 6 05:11:42 2011
@@ -31,6 +31,8 @@ Trunk (unreleased changes)
deployment layout to be consistent across the binary tgz, rpm, and deb.
(Eric Yang via omalley)
+ MAPREDUCE-2323. Add metrics to the fair scheduler. (todd)
+
IMPROVEMENTS
MAPREDUCE-2563. [Gridmix] Add High-Ram emulation system tests to
Modified: hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/build-contrib.xml Wed Jul 6 05:11:42 2011
@@ -307,6 +307,7 @@
errorProperty="tests.failed" failureProperty="tests.failed"
timeout="${test.timeout}">
+ <assertions><enable/></assertions>
<sysproperty key="test.build.data" value="${build.test}/data"/>
<sysproperty key="build.test" value="${build.test}"/>
<sysproperty key="test.build.extraconf" value="${test.build.extraconf}" />
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Wed Jul 6 05:11:42 2011
@@ -38,6 +38,9 @@ import org.apache.hadoop.conf.Configurat
import org.apache.hadoop.http.HttpServer;
import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.ReflectionUtils;
/**
@@ -71,6 +74,7 @@ public class FairScheduler extends TaskS
protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
new HashMap<JobInProgress, JobInfo>();
protected long lastUpdateTime; // Time when we last updated infos
+ protected long lastPreemptionUpdateTime; // Time when we last updated preemption vars
protected boolean initialized; // Are we initialized?
protected volatile boolean running; // Are we running?
protected boolean assignMultiple; // Simultaneously assign map and reduce?
@@ -210,6 +214,9 @@ public class FairScheduler extends TaskS
infoServer.addServlet("scheduler", "/scheduler",
FairSchedulerServlet.class);
}
+
+ initMetrics();
+
eventLog.log("INITIALIZED");
} catch (Exception e) {
// Can't load one of the managers - crash the JobTracker now while it is
@@ -219,6 +226,8 @@ public class FairScheduler extends TaskS
LOG.info("Successfully configured FairScheduler");
}
+ private MetricsUpdater metricsUpdater; // responsible for pushing hadoop metrics
+
/**
* Returns the LoadManager object used by the Fair Share scheduler
*/
@@ -226,6 +235,16 @@ public class FairScheduler extends TaskS
return loadMgr;
}
+ /**
+ * Register metrics for the fair scheduler, and start a thread
+ * to update them periodically.
+ */
+ private void initMetrics() {
+ MetricsContext context = MetricsUtil.getContext("fairscheduler");
+ metricsUpdater = new MetricsUpdater();
+ context.registerUpdater(metricsUpdater);
+ }
+
@Override
public void terminate() throws IOException {
if (eventLog != null)
@@ -236,6 +255,11 @@ public class FairScheduler extends TaskS
taskTrackerManager.removeJobInProgressListener(jobListener);
if (eventLog != null)
eventLog.shutdown();
+ if (metricsUpdater != null) {
+ MetricsContext context = MetricsUtil.getContext("fairscheduler");
+ context.unregisterUpdater(metricsUpdater);
+ metricsUpdater = null;
+ }
}
@@ -298,8 +322,7 @@ public class FairScheduler extends TaskS
public void jobRemoved(JobInProgress job) {
synchronized (FairScheduler.this) {
eventLog.log("JOB_REMOVED", job.getJobID());
- poolMgr.removeJob(job);
- infos.remove(job);
+ jobNoLongerRunning(job);
}
}
@@ -331,6 +354,20 @@ public class FairScheduler extends TaskS
}
}
}
+
+ /**
+ * Responsible for updating metrics when the metrics context requests it.
+ */
+ private class MetricsUpdater implements Updater {
+ @Override
+ public void doUpdates(MetricsContext context) {
+ updateMetrics();
+ }
+ }
+
+ synchronized void updateMetrics() {
+ poolMgr.updateMetrics();
+ }
@Override
public synchronized List<Task> assignTasks(TaskTracker tracker)
@@ -617,8 +654,7 @@ public class FairScheduler extends TaskS
}
}
for (JobInProgress job: toRemove) {
- infos.remove(job);
- poolMgr.removeJob(job);
+ jobNoLongerRunning(job);
}
updateRunnability(); // Set job runnability based on user/pool limits
@@ -647,6 +683,16 @@ public class FairScheduler extends TaskS
updatePreemptionVariables();
}
}
+
+ private void jobNoLongerRunning(JobInProgress job) {
+ assert Thread.holdsLock(this);
+ JobInfo info = infos.remove(job);
+ if (info != null) {
+ info.mapSchedulable.cleanupMetrics();
+ info.reduceSchedulable.cleanupMetrics();
+ }
+ poolMgr.removeJob(job);
+ }
public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
@@ -744,6 +790,7 @@ public class FairScheduler extends TaskS
*/
private void updatePreemptionVariables() {
long now = clock.getTime();
+ lastPreemptionUpdateTime = now;
for (TaskType type: MAP_AND_REDUCE) {
for (PoolSchedulable sched: getPoolSchedulables(type)) {
if (!isStarvedForMinShare(sched)) {
@@ -1044,4 +1091,11 @@ public class FairScheduler extends TaskS
public JobInfo getJobInfo(JobInProgress job) {
return infos.get(job);
}
+
+ boolean isPreemptionEnabled() {
+ return preemptionEnabled;
+ }
+ long getLastPreemptionUpdateTime() {
+ return lastPreemptionUpdateTime;
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Wed Jul 6 05:11:42 2011
@@ -35,6 +35,13 @@ public class JobSchedulable extends Sche
this.scheduler = scheduler;
this.job = job;
this.taskType = taskType;
+
+ initMetrics();
+ }
+
+ @Override
+ public TaskType getTaskType() {
+ return taskType;
}
@Override
@@ -151,4 +158,18 @@ public class JobSchedulable extends Sche
return null;
}
}
+
+
+ @Override
+ protected String getMetricsContextName() {
+ return "jobs";
+ }
+
+ @Override
+ void updateMetrics() {
+ assert metrics != null;
+
+ super.setMetricValues(metrics);
+ metrics.update();
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Wed Jul 6 05:11:42 2011
@@ -22,6 +22,7 @@ import java.util.ArrayList;
import java.util.Collection;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
/**
* A schedulable pool of jobs.
@@ -91,4 +92,9 @@ public class Pool {
public PoolSchedulable getSchedulable(TaskType type) {
return type == TaskType.MAP ? mapSchedulable : reduceSchedulable;
}
+
+ public void updateMetrics() {
+ mapSchedulable.updateMetrics();
+ reduceSchedulable.updateMetrics();
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Wed Jul 6 05:11:42 2011
@@ -37,6 +37,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.Node;
@@ -536,4 +537,10 @@ public class PoolManager {
public long getFairSharePreemptionTimeout() {
return fairSharePreemptionTimeout;
}
+
+ synchronized void updateMetrics() {
+ for (Pool pool : pools.values()) {
+ pool.updateMetrics();
+ }
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolSchedulable.java Wed Jul 6 05:11:42 2011
@@ -54,6 +54,8 @@ public class PoolSchedulable extends Sch
long currentTime = scheduler.getClock().getTime();
this.lastTimeAtMinShare = currentTime;
this.lastTimeAtHalfFairShare = currentTime;
+
+ initMetrics();
}
public void addJob(JobInProgress job) {
@@ -171,6 +173,7 @@ public class PoolSchedulable extends Sch
return pool;
}
+ @Override
public TaskType getTaskType() {
return taskType;
}
@@ -194,4 +197,25 @@ public class PoolSchedulable extends Sch
public void setLastTimeAtHalfFairShare(long lastTimeAtHalfFairShare) {
this.lastTimeAtHalfFairShare = lastTimeAtHalfFairShare;
}
+
+ protected String getMetricsContextName() {
+ return "pools";
+ }
+
+ @Override
+ public void updateMetrics() {
+ super.setMetricValues(metrics);
+
+ if (scheduler.isPreemptionEnabled()) {
+ // These won't be set if preemption is off
+ long lastCheck = scheduler.getLastPreemptionUpdateTime();
+ metrics.setMetric("millisSinceAtMinShare", lastCheck - lastTimeAtMinShare);
+ metrics.setMetric("millisSinceAtHalfFairShare", lastCheck - lastTimeAtHalfFairShare);
+ }
+ metrics.update();
+
+ for (JobSchedulable job : jobScheds) {
+ job.updateMetrics();
+ }
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Schedulable.java Wed Jul 6 05:11:42 2011
@@ -21,6 +21,11 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Collection;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsRecord;
+import org.apache.hadoop.metrics.MetricsUtil;
+
/**
* A Schedulable represents an entity that can launch tasks, such as a job
* or a pool. It provides a common interface so that algorithms such as fair
@@ -53,7 +58,8 @@ import java.util.Collection;
abstract class Schedulable {
/** Fair share assigned to this Schedulable */
private double fairShare = 0;
-
+ protected MetricsRecord metrics;
+
/**
* Name of job/pool, used for debugging as well as for breaking ties in
* scheduling order deterministically.
@@ -61,6 +67,11 @@ abstract class Schedulable {
public abstract String getName();
/**
+ * @return the type of tasks that this pool schedules
+ */
+ public abstract TaskType getTaskType();
+
+ /**
* Maximum number of tasks required by this Schedulable. This is defined as
* number of currently running tasks + number of unlaunched tasks (tasks that
* are either not yet launched or need to be speculated).
@@ -122,6 +133,35 @@ abstract class Schedulable {
return fairShare;
}
+ /** Return the name of the metrics context for this schedulable */
+ protected abstract String getMetricsContextName();
+
+ /**
+ * Set up metrics context
+ */
+ protected void initMetrics() {
+ MetricsContext metricsContext = MetricsUtil.getContext("fairscheduler");
+ this.metrics = MetricsUtil.createRecord(metricsContext,
+ getMetricsContextName());
+ metrics.setTag("name", getName());
+ metrics.setTag("taskType", getTaskType().toString());
+ }
+
+ void cleanupMetrics() {
+ metrics.remove();
+ metrics = null;
+ }
+
+ protected void setMetricValues(MetricsRecord metrics) {
+ metrics.setMetric("fairShare", (float)getFairShare());
+ metrics.setMetric("minShare", getMinShare());
+ metrics.setMetric("demand", getDemand());
+ metrics.setMetric("weight", (float)getWeight());
+ metrics.setMetric("runningTasks", getRunningTasks());
+ }
+
+ abstract void updateMetrics();
+
/** Convenient toString implementation for debugging. */
@Override
public String toString() {
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/FakeSchedulable.java Wed Jul 6 05:11:42 2011
@@ -21,6 +21,8 @@ package org.apache.hadoop.mapred;
import java.io.IOException;
import java.util.Collection;
+import org.apache.hadoop.mapreduce.TaskType;
+
/**
* Dummy implementation of Schedulable for unit testing.
*/
@@ -105,4 +107,18 @@ public class FakeSchedulable extends Sch
@Override
public void updateDemand() {}
+
+ @Override
+ public TaskType getTaskType() {
+ return TaskType.MAP;
+ }
+
+ @Override
+ protected String getMetricsContextName() {
+ return "fake";
+ }
+
+ @Override
+ void updateMetrics() {
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java (original)
+++ hadoop/common/trunk/mapreduce/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java Wed Jul 6 05:11:42 2011
@@ -47,6 +47,11 @@ import org.apache.hadoop.mapreduce.TaskT
import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
import org.apache.hadoop.mapreduce.split.JobSplit;
+import org.apache.hadoop.metrics.ContextFactory;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.spi.NoEmitMetricsContext;
+import org.apache.hadoop.metrics.spi.OutputRecord;
import org.apache.hadoop.net.Node;
import org.mortbay.log.Log;
@@ -516,7 +521,10 @@ public class TestFairScheduler extends T
}
private void setUpCluster(int numRacks, int numNodesPerRack,
- boolean assignMultiple) {
+ boolean assignMultiple) throws IOException {
+
+ resetMetrics();
+
conf = new JobConf();
conf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
conf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
@@ -534,6 +542,20 @@ public class TestFairScheduler extends T
scheduler.start();
}
+ /**
+ * Set up a metrics context that doesn't emit anywhere but stores the data
+ * so we can verify it. Also clears it of any data so that different test
+ * cases don't pollute each other.
+ */
+ private void resetMetrics() throws IOException {
+ ContextFactory factory = ContextFactory.getFactory();
+ factory.setAttribute("fairscheduler.class",
+ NoEmitMetricsContext.class.getName());
+
+ MetricsUtil.getContext("fairscheduler").createRecord("jobs").remove();
+ MetricsUtil.getContext("fairscheduler").createRecord("pools").remove();
+ }
+
@Override
protected void tearDown() throws Exception {
if (scheduler != null) {
@@ -689,7 +711,8 @@ public class TestFairScheduler extends T
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
-
+ verifyMetrics();
+
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
advanceTime(100);
@@ -709,6 +732,7 @@ public class TestFairScheduler extends T
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
+ verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1");
@@ -729,8 +753,8 @@ public class TestFairScheduler extends T
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
+ verifyMetrics();
}
-
/**
* This test is identical to testSmallJobs but sets assignMultiple to
* true so that multiple tasks can be assigned per heartbeat.
@@ -748,6 +772,7 @@ public class TestFairScheduler extends T
assertEquals(1, info1.reduceSchedulable.getDemand());
assertEquals(2.0, info1.mapSchedulable.getFairShare());
assertEquals(1.0, info1.reduceSchedulable.getFairShare());
+ verifyMetrics();
// Advance time before submitting another job j2, to make j1 run before j2
// deterministically.
@@ -768,6 +793,7 @@ public class TestFairScheduler extends T
assertEquals(2, info2.reduceSchedulable.getDemand());
assertEquals(1.0, info2.mapSchedulable.getFairShare());
assertEquals(2.0, info2.reduceSchedulable.getFairShare());
+ verifyMetrics();
// Assign tasks and check that jobs alternate in filling slots
checkAssignment("tt1", "attempt_test_0001_m_000000_0 on tt1",
@@ -788,6 +814,7 @@ public class TestFairScheduler extends T
assertEquals(2, info2.reduceSchedulable.getRunningTasks());
assertEquals(1, info2.mapSchedulable.getDemand());
assertEquals(2, info2.reduceSchedulable.getDemand());
+ verifyMetrics();
}
/**
@@ -1632,6 +1659,7 @@ public class TestFairScheduler extends T
assertEquals(0.28, info3.reduceSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.mapSchedulable.getFairShare(), 0.01);
assertEquals(0.28, info4.reduceSchedulable.getFairShare(), 0.01);
+ verifyMetrics();
}
/**
@@ -2730,6 +2758,65 @@ public class TestFairScheduler extends T
assertEquals(33, poolA.getMapSchedulable().getDemand());
assertEquals(39, poolA.getReduceSchedulable().getDemand());
}
+
+
+ /**
+ * Test switching a job from one pool to another, then back to the original
+ * one. This is a regression test for a bug seen during development of
+ * MAPREDUCE-2323 (fair scheduler metrics).
+ */
+ public void testSetPoolTwice() throws Exception {
+ // Set up pools file
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<pool name=\"default\">");
+ out.println("<schedulingMode>fair</schedulingMode>");
+ out.println("</pool>");
+ out.println("<pool name=\"poolA\">");
+ out.println("<schedulingMode>fair</schedulingMode>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+ scheduler.getPoolManager().reloadAllocs();
+ Pool defaultPool = scheduler.getPoolManager().getPool("default");
+ Pool poolA = scheduler.getPoolManager().getPool("poolA");
+
+ // Submit a job to the default pool. All specifications take default values.
+ JobInProgress job1 = submitJob(JobStatus.RUNNING, 1, 3);
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(0, poolA.getMapSchedulable().getDemand());
+ assertEquals(0, poolA.getReduceSchedulable().getDemand());
+
+ // Move job to poolA and make sure demand moves with it
+ scheduler.getPoolManager().setPool(job1, "poolA");
+ assertEquals("poolA", scheduler.getPoolManager().getPoolName(job1));
+
+ defaultPool.getMapSchedulable().updateDemand();
+ defaultPool.getReduceSchedulable().updateDemand();
+ poolA.getMapSchedulable().updateDemand();
+ poolA.getReduceSchedulable().updateDemand();
+
+ assertEquals(0, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(0, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(1, poolA.getMapSchedulable().getDemand());
+ assertEquals(3, poolA.getReduceSchedulable().getDemand());
+
+ // Move back to default pool and make sure demand goes back
+ scheduler.getPoolManager().setPool(job1, "default");
+ assertEquals("default", scheduler.getPoolManager().getPoolName(job1));
+
+ defaultPool.getMapSchedulable().updateDemand();
+ defaultPool.getReduceSchedulable().updateDemand();
+ poolA.getMapSchedulable().updateDemand();
+ poolA.getReduceSchedulable().updateDemand();
+
+ assertEquals(1, defaultPool.getMapSchedulable().getDemand());
+ assertEquals(3, defaultPool.getReduceSchedulable().getDemand());
+ assertEquals(0, poolA.getMapSchedulable().getDemand());
+ assertEquals(0, poolA.getReduceSchedulable().getDemand());
+ }
private void advanceTime(long time) {
clock.advance(time);
@@ -2828,4 +2915,108 @@ public class TestFairScheduler extends T
assertNull(scheduler.assignTasks(tracker("tt1")));
assertNull(scheduler.assignTasks(tracker("tt2")));
}
+
+
+ /**
+ * Ask scheduler to update metrics and then verify that they're all
+ * correctly published to the metrics context
+ */
+ private void verifyMetrics() {
+ scheduler.updateMetrics();
+ verifyPoolMetrics();
+ verifyJobMetrics();
+ }
+
+ /**
+ * Verify that pool-level metrics match internal data
+ */
+ private void verifyPoolMetrics() {
+ MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
+ Collection<OutputRecord> records = ctx.getAllRecords().get("pools");
+
+ try {
+ assertEquals(scheduler.getPoolSchedulables(TaskType.MAP).size() * 2,
+ records.size());
+ } catch (Error e) {
+ for (OutputRecord rec : records) {
+ System.err.println("record:");
+ System.err.println(" name: " + rec.getTag("name"));
+ System.err.println(" type: " + rec.getTag("type"));
+ }
+
+ throw e;
+ }
+
+ Map<String, OutputRecord> byPoolAndType =
+ new HashMap<String, OutputRecord>();
+ for (OutputRecord rec : records) {
+ String pool = (String)rec.getTag("name");
+ String type = (String)rec.getTag("taskType");
+ assertNotNull(pool);
+ assertNotNull(type);
+ byPoolAndType.put(pool + "_" + type, rec);
+ }
+
+ List<PoolSchedulable> poolScheds = new ArrayList<PoolSchedulable>();
+ poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.MAP));
+ poolScheds.addAll(scheduler.getPoolSchedulables(TaskType.REDUCE));
+
+ for (PoolSchedulable pool : poolScheds) {
+ String poolName = pool.getName();
+ OutputRecord metrics = byPoolAndType.get(
+ poolName + "_" + pool.getTaskType().toString());
+ assertNotNull("Need metrics for " + pool, metrics);
+
+ verifySchedulableMetrics(pool, metrics);
+ }
+
+ }
+
+ /**
+ * Verify that the job-level metrics match internal data
+ */
+ private void verifyJobMetrics() {
+ MetricsContext ctx = MetricsUtil.getContext("fairscheduler");
+ Collection<OutputRecord> records = ctx.getAllRecords().get("jobs");
+
+ System.out.println("Checking job metrics...");
+ Map<String, OutputRecord> byJobIdAndType =
+ new HashMap<String, OutputRecord>();
+ for (OutputRecord rec : records) {
+ String jobId = (String)rec.getTag("name");
+ String type = (String)rec.getTag("taskType");
+ assertNotNull(jobId);
+ assertNotNull(type);
+ byJobIdAndType.put(jobId + "_" + type, rec);
+ System.out.println("Got " + type + " metrics for job: " + jobId);
+ }
+ assertEquals(scheduler.infos.size() * 2, byJobIdAndType.size());
+
+ for (Map.Entry<JobInProgress, JobInfo> entry :
+ scheduler.infos.entrySet()) {
+ JobInfo info = entry.getValue();
+ String jobId = entry.getKey().getJobID().toString();
+
+ OutputRecord mapMetrics = byJobIdAndType.get(jobId + "_MAP");
+ assertNotNull("Job " + jobId + " should have map metrics", mapMetrics);
+ verifySchedulableMetrics(info.mapSchedulable, mapMetrics);
+
+ OutputRecord reduceMetrics = byJobIdAndType.get(jobId + "_REDUCE");
+ assertNotNull("Job " + jobId + " should have reduce metrics", reduceMetrics);
+ verifySchedulableMetrics(info.reduceSchedulable, reduceMetrics);
+ }
+ }
+
+ /**
+ * Verify that the metrics for a given Schedulable are correct
+ */
+ private void verifySchedulableMetrics(
+ Schedulable sched, OutputRecord metrics) {
+ assertEquals(sched.getRunningTasks(), metrics.getMetric("runningTasks"));
+ assertEquals(sched.getDemand(), metrics.getMetric("demand"));
+ assertEquals(sched.getFairShare(),
+ metrics.getMetric("fairShare").doubleValue(), .001);
+ assertEquals(sched.getWeight(),
+ metrics.getMetric("weight").doubleValue(), .001);
+ }
}
Modified: hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=1143252&r1=1143251&r2=1143252&view=diff
==============================================================================
--- hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/common/trunk/mapreduce/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Wed Jul 6 05:11:42 2011
@@ -517,6 +517,24 @@
<em>NewJobWeightBooster</em> are enabled.</li>
</ul>
</section>
+ <section>
+ <title>Metrics</title>
+ <p>
+ The fair scheduler can export metrics using the Hadoop metrics interface.
+ This can be enabled by adding an entry to <code>hadoop-metrics.properties</code>
+ to enable the <code>fairscheduler</code> metrics context. For example, to
+ simply retain the metrics in memory so they may be viewed in the <code>/metrics</code>
+ servlet:
+ </p>
+ <p>
+ <code>fairscheduler.class=org.apache.hadoop.metrics.spi.NoEmitMetricsContext</code>
+ </p>
+ <p>
+ Metrics are generated for each pool and job, and contain the same information that
+ is visible on the <code>/scheduler</code> web page.
+ </p>
+ </section>
+
<!--
<section>
<title>Implementation</title>