You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ac...@apache.org on 2011/09/12 01:57:38 UTC
svn commit: r1169585 [2/5] - in
/hadoop/common/branches/branch-0.20-security: ./ conf/
src/contrib/capacity-scheduler/src/java/org/apache/hadoop/mapred/
src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/
src/contrib/fairscheduler/ src/co...
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sun Sep 11 23:57:37 2011
@@ -25,28 +25,47 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.Map.Entry;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.http.HttpServer;
-import org.apache.hadoop.mapred.JobStatus;
-import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.mapreduce.server.jobtracker.TaskTracker;
+import org.apache.hadoop.metrics.MetricsContext;
+import org.apache.hadoop.metrics.MetricsUtil;
+import org.apache.hadoop.metrics.Updater;
+import org.apache.hadoop.util.ReflectionUtils;
/**
* A {@link TaskScheduler} that implements fair sharing.
*/
public class FairScheduler extends TaskScheduler {
- /** How often fair shares are re-calculated */
- public static final long UPDATE_INTERVAL = 500;
public static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.mapred.FairScheduler");
+
+ // How often fair shares are re-calculated
+ protected long updateInterval = 500;
+
+ // How often to dump scheduler state to the event log
+ protected long dumpInterval = 10000;
+
+ // How often tasks are preempted (must be longer than a couple
+ // of heartbeats to give task-kill commands a chance to act).
+ protected long preemptionInterval = 15000;
+
+ // Used to iterate through map and reduce task types
+ private static final TaskType[] MAP_AND_REDUCE =
+ new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+
+ // Maximum locality delay when auto-computing locality delays
+ private static final long MAX_AUTOCOMPUTED_LOCALITY_DELAY = 15000;
protected PoolManager poolMgr;
protected LoadManager loadMgr;
@@ -55,16 +74,29 @@ public class FairScheduler extends TaskS
protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
new HashMap<JobInProgress, JobInfo>();
protected long lastUpdateTime; // Time when we last updated infos
+ protected long lastPreemptionUpdateTime; // Time when we last updated preemption vars
protected boolean initialized; // Are we initialized?
protected volatile boolean running; // Are we running?
- protected boolean useFifo; // Set if we want to revert to FIFO behavior
protected boolean assignMultiple; // Simultaneously assign map and reduce?
+ protected int mapAssignCap = -1; // Max maps to launch per heartbeat
+ protected int reduceAssignCap = -1; // Max reduces to launch per heartbeat
+ protected long nodeLocalityDelay; // Time to wait for node locality
+ protected long rackLocalityDelay; // Time to wait for rack locality
+ protected boolean autoComputeLocalityDelay = false; // Compute locality delay
+ // from heartbeat interval
protected boolean sizeBasedWeight; // Give larger weights to larger jobs
protected boolean waitForMapsBeforeLaunchingReduces = true;
+ protected boolean preemptionEnabled;
+ protected boolean onlyLogPreemption; // Only log when tasks should be killed
private Clock clock;
- private boolean runBackgroundUpdates; // Can be set to false for testing
- private EagerTaskInitializationListener eagerInitListener;
private JobListener jobListener;
+ private JobInitializer jobInitializer;
+ private boolean mockMode; // Used for unit tests; disables background updates
+ // and scheduler event log
+ private FairSchedulerEventLog eventLog;
+ protected long lastDumpTime; // Time when we last dumped state to log
+ protected long lastHeartbeatTime; // Time we last ran assignTasks
+ private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
/**
* A class for holding per-job scheduler variables. These always contain the
@@ -73,30 +105,32 @@ public class FairScheduler extends TaskS
*/
static class JobInfo {
boolean runnable = false; // Can the job run given user/pool limits?
- double mapWeight = 0; // Weight of job in calculation of map share
- double reduceWeight = 0; // Weight of job in calculation of reduce share
- long mapDeficit = 0; // Time deficit for maps
- long reduceDeficit = 0; // Time deficit for reduces
- int runningMaps = 0; // Maps running at last update
- int runningReduces = 0; // Reduces running at last update
- int neededMaps; // Maps needed at last update
- int neededReduces; // Reduces needed at last update
- int minMaps = 0; // Minimum maps as guaranteed by pool
- int minReduces = 0; // Minimum reduces as guaranteed by pool
- double mapFairShare = 0; // Fair share of map slots at last update
- double reduceFairShare = 0; // Fair share of reduce slots at last update
+ // Does this job need to be initialized?
+ volatile boolean needsInitializing = true;
+ public JobSchedulable mapSchedulable;
+ public JobSchedulable reduceSchedulable;
+ // Variables used for delay scheduling
+ LocalityLevel lastMapLocalityLevel; // Locality level of last map launched
+ long timeWaitedForLocalMap; // Time waiting for local map since last map
+ boolean skippedAtLastHeartbeat; // Was job skipped at previous assignTasks?
+ // (used to update timeWaitedForLocalMap)
+ public JobInfo(JobSchedulable mapSched, JobSchedulable reduceSched) {
+ this.mapSchedulable = mapSched;
+ this.reduceSchedulable = reduceSched;
+ this.lastMapLocalityLevel = LocalityLevel.NODE;
+ }
}
public FairScheduler() {
- this(new Clock(), true);
+ this(new Clock(), false);
}
/**
* Constructor used for tests, which can change the clock and disable updates.
*/
- protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+ protected FairScheduler(Clock clock, boolean mockMode) {
this.clock = clock;
- this.runBackgroundUpdates = runBackgroundUpdates;
+ this.mockMode = mockMode;
this.jobListener = new JobListener();
}
@@ -104,16 +138,27 @@ public class FairScheduler extends TaskS
public void start() {
try {
Configuration conf = getConf();
- this.eagerInitListener = new EagerTaskInitializationListener(conf);
- eagerInitListener.setTaskTrackerManager(taskTrackerManager);
- eagerInitListener.start();
- taskTrackerManager.addJobInProgressListener(eagerInitListener);
+ // Create scheduling log and initialize it if it is enabled
+ eventLog = new FairSchedulerEventLog();
+ boolean logEnabled = conf.getBoolean(
+ "mapred.fairscheduler.eventlog.enabled", false);
+ if (!mockMode && logEnabled) {
+ String hostname = "localhost";
+ if (taskTrackerManager instanceof JobTracker) {
+ hostname = ((JobTracker) taskTrackerManager).getJobTrackerMachine();
+ }
+ eventLog.init(conf, hostname);
+ }
+ // Initialize other pieces of the scheduler
+ jobInitializer = new JobInitializer(conf, taskTrackerManager);
taskTrackerManager.addJobInProgressListener(jobListener);
- poolMgr = new PoolManager(conf);
+ poolMgr = new PoolManager(this);
+ poolMgr.initialize();
loadMgr = (LoadManager) ReflectionUtils.newInstance(
conf.getClass("mapred.fairscheduler.loadmanager",
CapBasedLoadManager.class, LoadManager.class), conf);
loadMgr.setTaskTrackerManager(taskTrackerManager);
+ loadMgr.setEventLog(eventLog);
loadMgr.start();
taskSelector = (TaskSelector) ReflectionUtils.newInstance(
conf.getClass("mapred.fairscheduler.taskselector",
@@ -126,16 +171,41 @@ public class FairScheduler extends TaskS
weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
weightAdjClass, conf);
}
- assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
- false);
- sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
- false);
+ updateInterval = conf.getLong(
+ "mapred.fairscheduler.update.interval", 500);
+ dumpInterval = conf.getLong(
+ "mapred.fairscheduler.dump.interval", 10000);
+ preemptionInterval = conf.getLong(
+ "mapred.fairscheduler.preemption.interval", 15000);
+ assignMultiple = conf.getBoolean(
+ "mapred.fairscheduler.assignmultiple", true);
+ mapAssignCap = conf.getInt(
+ "mapred.fairscheduler.assignmultiple.maps", -1);
+ reduceAssignCap = conf.getInt(
+ "mapred.fairscheduler.assignmultiple.reduces", -1);
+ sizeBasedWeight = conf.getBoolean(
+ "mapred.fairscheduler.sizebasedweight", false);
+ preemptionEnabled = conf.getBoolean(
+ "mapred.fairscheduler.preemption", false);
+ onlyLogPreemption = conf.getBoolean(
+ "mapred.fairscheduler.preemption.only.log", false);
+ long defaultDelay = conf.getLong(
+ "mapred.fairscheduler.locality.delay", -1);
+ nodeLocalityDelay = conf.getLong(
+ "mapred.fairscheduler.locality.delay.node", defaultDelay);
+ rackLocalityDelay = conf.getLong(
+ "mapred.fairscheduler.locality.delay.rack", defaultDelay);
+ if (defaultDelay == -1 &&
+ (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
+ autoComputeLocalityDelay = true; // Compute from heartbeat interval
+ }
initialized = true;
running = true;
lastUpdateTime = clock.getTime();
// Start a thread to update deficits every UPDATE_INTERVAL
- if (runBackgroundUpdates)
+ if (!mockMode) {
new UpdateThread().start();
+ }
// Register servlet with JobTracker's Jetty server
if (taskTrackerManager instanceof JobTracker) {
JobTracker jobTracker = (JobTracker) taskTrackerManager;
@@ -144,6 +214,10 @@ public class FairScheduler extends TaskS
infoServer.addServlet("scheduler", "/scheduler",
FairSchedulerServlet.class);
}
+
+ initMetrics();
+
+ eventLog.log("INITIALIZED");
} catch (Exception e) {
// Can't load one of the managers - crash the JobTracker now while it is
// starting up so that the user notices.
@@ -152,25 +226,94 @@ public class FairScheduler extends TaskS
LOG.info("Successfully configured FairScheduler");
}
+ private MetricsUpdater metricsUpdater; // responsible for pushing hadoop metrics
+
+ /**
+ * Returns the LoadManager object used by the Fair Share scheduler
+ */
+ LoadManager getLoadManager() {
+ return loadMgr;
+ }
+
+ /**
+ * Register metrics for the fair scheduler, and start a thread
+ * to update them periodically.
+ */
+ private void initMetrics() {
+ MetricsContext context = MetricsUtil.getContext("fairscheduler");
+ metricsUpdater = new MetricsUpdater();
+ context.registerUpdater(metricsUpdater);
+ }
+
@Override
public void terminate() throws IOException {
+ if (eventLog != null)
+ eventLog.log("SHUTDOWN");
running = false;
+ jobInitializer.terminate();
if (jobListener != null)
taskTrackerManager.removeJobInProgressListener(jobListener);
- if (eagerInitListener != null)
- taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+ if (eventLog != null)
+ eventLog.shutdown();
+ if (metricsUpdater != null) {
+ MetricsContext context = MetricsUtil.getContext("fairscheduler");
+ context.unregisterUpdater(metricsUpdater);
+ metricsUpdater = null;
+ }
+ }
+
+
+ private class JobInitializer {
+ private final int DEFAULT_NUM_THREADS = 1;
+ private ExecutorService threadPool;
+ private TaskTrackerManager ttm;
+ public JobInitializer(Configuration conf, TaskTrackerManager ttm) {
+ int numThreads = conf.getInt("mapred.jobinit.threads",
+ DEFAULT_NUM_THREADS);
+ threadPool = Executors.newFixedThreadPool(numThreads);
+ this.ttm = ttm;
+ }
+ public void initJob(JobInfo jobInfo, JobInProgress job) {
+ if (!mockMode) {
+ threadPool.execute(new InitJob(jobInfo, job));
+ } else {
+ new InitJob(jobInfo, job).run();
+ }
+ }
+ class InitJob implements Runnable {
+ private JobInfo jobInfo;
+ private JobInProgress job;
+ public InitJob(JobInfo jobInfo, JobInProgress job) {
+ this.jobInfo = jobInfo;
+ this.job = job;
+ }
+ public void run() {
+ ttm.initJob(job);
+ }
+ }
+ void terminate() {
+ LOG.info("Shutting down thread pool");
+ threadPool.shutdownNow();
+ try {
+ threadPool.awaitTermination(1, TimeUnit.MINUTES);
+ } catch (InterruptedException e) {
+ // Ignore, we are in shutdown anyway.
+ }
+ }
}
-
- /**
+
+/**
* Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
*/
private class JobListener extends JobInProgressListener {
@Override
public void jobAdded(JobInProgress job) {
synchronized (FairScheduler.this) {
- poolMgr.addJob(job);
- JobInfo info = new JobInfo();
+ eventLog.log("JOB_ADDED", job.getJobID());
+ JobInfo info = new JobInfo(new JobSchedulable(FairScheduler.this, job, TaskType.MAP),
+ new JobSchedulable(FairScheduler.this, job, TaskType.REDUCE));
infos.put(job, info);
+ poolMgr.addJob(job); // Also adds job into the right PoolScheduable
update();
}
}
@@ -178,13 +321,14 @@ public class FairScheduler extends TaskS
@Override
public void jobRemoved(JobInProgress job) {
synchronized (FairScheduler.this) {
- poolMgr.removeJob(job);
- infos.remove(job);
+ eventLog.log("JOB_REMOVED", job.getJobID());
+ jobNoLongerRunning(job);
}
}
@Override
public void jobUpdated(JobChangeEvent event) {
+ eventLog.log("JOB_UPDATED", event.getJobInProgress().getJobID());
}
}
@@ -200,30 +344,50 @@ public class FairScheduler extends TaskS
public void run() {
while (running) {
try {
- Thread.sleep(UPDATE_INTERVAL);
+ Thread.sleep(updateInterval);
update();
+ dumpIfNecessary();
+ preemptTasksIfNecessary();
} catch (Exception e) {
- LOG.error("Failed to update fair share calculations", e);
+ LOG.error("Exception in fair scheduler UpdateThread", e);
}
}
}
}
+
+ /**
+ * Responsible for updating metrics when the metrics context requests it.
+ */
+ private class MetricsUpdater implements Updater {
+ @Override
+ public void doUpdates(MetricsContext context) {
+ updateMetrics();
+ }
+ }
+
+ synchronized void updateMetrics() {
+ poolMgr.updateMetrics();
+ }
@Override
public synchronized List<Task> assignTasks(TaskTracker tracker)
throws IOException {
if (!initialized) // Don't try to assign tasks if we haven't yet started up
return null;
+ String trackerName = tracker.getTrackerName();
+ eventLog.log("HEARTBEAT", trackerName);
+ long currentTime = clock.getTime();
- // Reload allocations file if it hasn't been loaded in a while
- poolMgr.reloadAllocsIfNecessary();
-
- // Compute total runnable maps and reduces
+ // Compute total runnable maps and reduces, and currently running ones
int runnableMaps = 0;
+ int runningMaps = 0;
int runnableReduces = 0;
- for (JobInProgress job: infos.keySet()) {
- runnableMaps += runnableTasks(job, TaskType.MAP);
- runnableReduces += runnableTasks(job, TaskType.REDUCE);
+ int runningReduces = 0;
+ for (Pool pool: poolMgr.getPools()) {
+ runnableMaps += pool.getMapSchedulable().getDemand();
+ runningMaps += pool.getMapSchedulable().getRunningTasks();
+ runnableReduces += pool.getReduceSchedulable().getDemand();
+ runningReduces += pool.getReduceSchedulable().getRunningTasks();
}
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
@@ -233,50 +397,123 @@ public class FairScheduler extends TaskS
int totalMapSlots = getTotalSlots(TaskType.MAP, clusterStatus);
int totalReduceSlots = getTotalSlots(TaskType.REDUCE, clusterStatus);
- // Scan to see whether any job needs to run a map, then a reduce
+ eventLog.log("RUNNABLE_TASKS",
+ runnableMaps, runningMaps, runnableReduces, runningReduces);
+
+ // Update time waited for local maps for jobs skipped on last heartbeat
+ updateLocalityWaitTimes(currentTime);
+
+ TaskTrackerStatus tts = tracker.getStatus();
+
+ int mapsAssigned = 0; // loop counter for map in the below while loop
+ int reducesAssigned = 0; // loop counter for reduce in the below while
+ int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
+ int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
+ boolean mapRejected = false; // flag used for ending the loop
+ boolean reduceRejected = false; // flag used for ending the loop
+
+ // Keep track of which jobs were visited for map tasks and which had tasks
+ // launched, so that we can later mark skipped jobs for delay scheduling
+ Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
+ Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
+ Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
+
ArrayList<Task> tasks = new ArrayList<Task>();
- TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
- TaskTrackerStatus trackerStatus = tracker.getStatus();
- for (TaskType taskType: types) {
- boolean canAssign = (taskType == TaskType.MAP) ?
- loadMgr.canAssignMap(trackerStatus, runnableMaps, totalMapSlots) :
- loadMgr.canAssignReduce(trackerStatus, runnableReduces, totalReduceSlots);
- if (canAssign) {
- // Figure out the jobs that need this type of task
- List<JobInProgress> candidates = new ArrayList<JobInProgress>();
- for (JobInProgress job: infos.keySet()) {
- if (job.getStatus().getRunState() == JobStatus.RUNNING &&
- neededTasks(job, taskType) > 0) {
- candidates.add(job);
- }
+ // Scan jobs to assign tasks until neither maps nor reduces can be assigned
+ while (true) {
+ // Computing the ending conditions for the loop
+ // Reject a task type if one of the following condition happens
+ // 1. number of assigned task reaches per heatbeat limit
+ // 2. number of running tasks reaches runnable tasks
+ // 3. task is rejected by the LoadManager.canAssign
+ if (!mapRejected) {
+ if (mapsAssigned == mapCapacity ||
+ runningMaps == runnableMaps ||
+ !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {
+ eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
+ mapRejected = true;
}
- // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
- Comparator<JobInProgress> comparator = useFifo ?
- new FifoJobComparator() : new DeficitComparator(taskType);
- Collections.sort(candidates, comparator);
- for (JobInProgress job: candidates) {
- Task task = (taskType == TaskType.MAP ?
- taskSelector.obtainNewMapTask(trackerStatus, job) :
- taskSelector.obtainNewReduceTask(trackerStatus, job));
- if (task != null) {
- // Update the JobInfo for this job so we account for the launched
- // tasks during this update interval and don't try to launch more
- // tasks than the job needed on future heartbeats
- JobInfo info = infos.get(job);
- if (taskType == TaskType.MAP) {
- info.runningMaps++;
- info.neededMaps--;
- } else {
- info.runningReduces++;
- info.neededReduces--;
- }
- tasks.add(task);
- if (!assignMultiple)
- return tasks;
- break;
+ }
+ if (!reduceRejected) {
+ if (reducesAssigned == reduceCapacity ||
+ runningReduces == runnableReduces ||
+ !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {
+ eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
+ reduceRejected = true;
+ }
+ }
+ // Exit while (true) loop if
+ // 1. neither maps nor reduces can be assigned
+ // 2. assignMultiple is off and we already assigned one task
+ if (mapRejected && reduceRejected ||
+ !assignMultiple && tasks.size() > 0) {
+ break; // This is the only exit of the while (true) loop
+ }
+
+ // Determine which task type to assign this time
+ // First try choosing a task type which is not rejected
+ TaskType taskType;
+ if (mapRejected) {
+ taskType = TaskType.REDUCE;
+ } else if (reduceRejected) {
+ taskType = TaskType.MAP;
+ } else {
+ // If both types are available, choose the task type with fewer running
+ // tasks on the task tracker to prevent that task type from starving
+ if (tts.countMapTasks() <= tts.countReduceTasks()) {
+ taskType = TaskType.MAP;
+ } else {
+ taskType = TaskType.REDUCE;
+ }
+ }
+
+ // Get the map or reduce schedulables and sort them by fair sharing
+ List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
+ Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+ boolean foundTask = false;
+ for (Schedulable sched: scheds) { // This loop will assign only one task
+ eventLog.log("INFO", "Checking for " + taskType +
+ " task in " + sched.getName());
+ Task task = taskType == TaskType.MAP ?
+ sched.assignTask(tts, currentTime, visitedForMap) :
+ sched.assignTask(tts, currentTime, visitedForReduce);
+ if (task != null) {
+ foundTask = true;
+ JobInProgress job = taskTrackerManager.getJob(task.getJobID());
+ eventLog.log("ASSIGN", trackerName, taskType,
+ job.getJobID(), task.getTaskID());
+ // Update running task counts, and the job's locality level
+ if (taskType == TaskType.MAP) {
+ launchedMap.add(job);
+ mapsAssigned++;
+ runningMaps++;
+ updateLastMapLocalityLevel(job, task, tts);
+ } else {
+ reducesAssigned++;
+ runningReduces++;
}
+ // Add task to the list of assignments
+ tasks.add(task);
+ break; // This break makes this loop assign only one task
+ } // end if(task != null)
+ } // end for(Schedulable sched: scheds)
+
+ // Reject the task type if we cannot find a task
+ if (!foundTask) {
+ if (taskType == TaskType.MAP) {
+ mapRejected = true;
+ } else {
+ reduceRejected = true;
}
}
+ } // end while (true)
+
+ // Mark any jobs that were visited for map tasks but did not launch a task
+ // as skipped on this heartbeat
+ for (JobInProgress job: visitedForMap) {
+ if (!launchedMap.contains(job)) {
+ infos.get(job).skippedAtLastHeartbeat = true;
+ }
}
// If no tasks were found, return null
@@ -284,40 +521,102 @@ public class FairScheduler extends TaskS
}
/**
- * Compare jobs by deficit for a given task type, putting jobs whose current
- * allocation is less than their minimum share always ahead of others. This is
- * the default job comparator used for Fair Sharing.
- */
- private class DeficitComparator implements Comparator<JobInProgress> {
- private final TaskType taskType;
-
- private DeficitComparator(TaskType taskType) {
- this.taskType = taskType;
- }
-
- public int compare(JobInProgress j1, JobInProgress j2) {
- // Put needy jobs ahead of non-needy jobs (where needy means must receive
- // new tasks to meet slot minimum), comparing among jobs of the same type
- // by deficit so as to put jobs with higher deficit ahead.
- JobInfo j1Info = infos.get(j1);
- JobInfo j2Info = infos.get(j2);
- long deficitDif;
- boolean j1Needy, j2Needy;
- if (taskType == TaskType.MAP) {
- j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
- j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
- deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
- } else {
- j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
- j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
- deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
- }
- if (j1Needy && !j2Needy)
- return -1;
- else if (j2Needy && !j1Needy)
- return 1;
- else // Both needy or both non-needy; compare by deficit
- return (int) Math.signum(deficitDif);
+ * Get maximum number of tasks to assign on a TaskTracker on a heartbeat.
+ * The scheduler may launch fewer than this many tasks if the LoadManager
+ * says not to launch more, but it will never launch more than this number.
+ */
+ private int maxTasksToAssign(TaskType type, TaskTrackerStatus tts) {
+ if (!assignMultiple)
+ return 1;
+ int cap = (type == TaskType.MAP) ? mapAssignCap : reduceAssignCap;
+ if (cap == -1) // Infinite cap; use the TaskTracker's slot count
+ return (type == TaskType.MAP) ?
+ tts.getAvailableMapSlots(): tts.getAvailableReduceSlots();
+ else
+ return cap;
+ }
+
+ /**
+ * Update locality wait times for jobs that were skipped at last heartbeat.
+ */
+ private void updateLocalityWaitTimes(long currentTime) {
+ long timeSinceLastHeartbeat =
+ (lastHeartbeatTime == 0 ? 0 : currentTime - lastHeartbeatTime);
+ lastHeartbeatTime = currentTime;
+ for (JobInfo info: infos.values()) {
+ if (info.skippedAtLastHeartbeat) {
+ info.timeWaitedForLocalMap += timeSinceLastHeartbeat;
+ info.skippedAtLastHeartbeat = false;
+ }
+ }
+ }
+
+ /**
+ * Update a job's locality level and locality wait variables given that that
+ * it has just launched a map task on a given task tracker.
+ */
+ private void updateLastMapLocalityLevel(JobInProgress job,
+ Task mapTaskLaunched, TaskTrackerStatus tracker) {
+ JobInfo info = infos.get(job);
+ LocalityLevel localityLevel = LocalityLevel.fromTask(
+ job, mapTaskLaunched, tracker);
+ info.lastMapLocalityLevel = localityLevel;
+ info.timeWaitedForLocalMap = 0;
+ eventLog.log("ASSIGNED_LOC_LEVEL", job.getJobID(), localityLevel);
+ }
+
+ /**
+ * Get the maximum locality level at which a given job is allowed to
+ * launch tasks, based on how long it has been waiting for local tasks.
+ * This is used to implement the "delay scheduling" feature of the Fair
+ * Scheduler for optimizing data locality.
+ * If the job has no locality information (e.g. it does not use HDFS), this
+ * method returns LocalityLevel.ANY, allowing tasks at any level.
+ * Otherwise, the job can only launch tasks at its current locality level
+ * or lower, unless it has waited at least nodeLocalityDelay or
+ * rackLocalityDelay milliseconds depends on the current level. If it
+ * has waited (nodeLocalityDelay + rackLocalityDelay) milliseconds,
+ * it can go to any level.
+ */
+ protected LocalityLevel getAllowedLocalityLevel(JobInProgress job,
+ long currentTime) {
+ JobInfo info = infos.get(job);
+ if (info == null) { // Job not in infos (shouldn't happen)
+ LOG.error("getAllowedLocalityLevel called on job " + job
+ + ", which does not have a JobInfo in infos");
+ return LocalityLevel.ANY;
+ }
+ if (job.nonLocalMaps.size() > 0) { // Job doesn't have locality information
+ return LocalityLevel.ANY;
+ }
+ // Don't wait for locality if the job's pool is starving for maps
+ Pool pool = poolMgr.getPool(job);
+ PoolSchedulable sched = pool.getMapSchedulable();
+ long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool.getName());
+ long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+ if (currentTime - sched.getLastTimeAtMinShare() > minShareTimeout ||
+ currentTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+ eventLog.log("INFO", "No delay scheduling for "
+ + job.getJobID() + " because it is being starved");
+ return LocalityLevel.ANY;
+ }
+ // In the common case, compute locality level based on time waited
+ switch(info.lastMapLocalityLevel) {
+ case NODE: // Last task launched was node-local
+ if (info.timeWaitedForLocalMap >=
+ nodeLocalityDelay + rackLocalityDelay)
+ return LocalityLevel.ANY;
+ else if (info.timeWaitedForLocalMap >= nodeLocalityDelay)
+ return LocalityLevel.RACK;
+ else
+ return LocalityLevel.NODE;
+ case RACK: // Last task launched was rack-local
+ if (info.timeWaitedForLocalMap >= rackLocalityDelay)
+ return LocalityLevel.ANY;
+ else
+ return LocalityLevel.RACK;
+ default: // Last task was non-local; can launch anywhere
+ return LocalityLevel.ANY;
}
}
@@ -327,11 +626,25 @@ public class FairScheduler extends TaskS
* and needed tasks of each type.
*/
protected void update() {
- //Making more granual locking so that clusterStatus can be fetched from Jobtracker.
+ // Making more granular locking so that clusterStatus can be fetched
+ // from Jobtracker without locking the scheduler.
ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
- // Got clusterStatus hence acquiring scheduler lock now
- // Remove non-running jobs
- synchronized(this){
+
+ // Recompute locality delay from JobTracker heartbeat interval if enabled.
+ // This will also lock the JT, so do it outside of a fair scheduler lock.
+ if (autoComputeLocalityDelay) {
+ JobTracker jobTracker = (JobTracker) taskTrackerManager;
+ nodeLocalityDelay = Math.min(MAX_AUTOCOMPUTED_LOCALITY_DELAY,
+ (long) (1.5 * jobTracker.getNextHeartbeatInterval()));
+ rackLocalityDelay = nodeLocalityDelay;
+ }
+
+ // Got clusterStatus hence acquiring scheduler lock now.
+ synchronized (this) {
+ // Reload allocations file if it hasn't been loaded in a while
+ poolMgr.reloadAllocsIfNecessary();
+
+ // Remove any jobs that have stopped running
List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
for (JobInProgress job: infos.keySet()) {
int runState = job.getStatus().getRunState();
@@ -341,30 +654,52 @@ public class FairScheduler extends TaskS
}
}
for (JobInProgress job: toRemove) {
- infos.remove(job);
- poolMgr.removeJob(job);
+ jobNoLongerRunning(job);
+ }
+
+ updateRunnability(); // Set job runnability based on user/pool limits
+
+ // Update demands of jobs and pools
+ for (Pool pool: poolMgr.getPools()) {
+ pool.getMapSchedulable().updateDemand();
+ pool.getReduceSchedulable().updateDemand();
}
- // Update running jobs with deficits since last update, and compute new
- // slot allocations, weight, shares and task counts
- long now = clock.getTime();
- long timeDelta = now - lastUpdateTime;
- updateDeficits(timeDelta);
- updateRunnability();
- updateTaskCounts();
- updateWeights();
- updateMinSlots();
- updateFairShares(clusterStatus);
- lastUpdateTime = now;
+
+ // Compute fair shares based on updated demands
+ List<PoolSchedulable> mapScheds = getPoolSchedulables(TaskType.MAP);
+ List<PoolSchedulable> reduceScheds = getPoolSchedulables(TaskType.REDUCE);
+ SchedulingAlgorithms.computeFairShares(
+ mapScheds, clusterStatus.getMaxMapTasks());
+ SchedulingAlgorithms.computeFairShares(
+ reduceScheds, clusterStatus.getMaxReduceTasks());
+
+ // Use the computed shares to assign shares within each pool
+ for (Pool pool: poolMgr.getPools()) {
+ pool.getMapSchedulable().redistributeShare();
+ pool.getReduceSchedulable().redistributeShare();
+ }
+
+ if (preemptionEnabled)
+ updatePreemptionVariables();
}
}
+
+ private void jobNoLongerRunning(JobInProgress job) {
+ assert Thread.holdsLock(this);
+ JobInfo info = infos.remove(job);
+ if (info != null) {
+ info.mapSchedulable.cleanupMetrics();
+ info.reduceSchedulable.cleanupMetrics();
+ }
+ poolMgr.removeJob(job);
+ }
- private void updateDeficits(long timeDelta) {
- for (JobInfo info: infos.values()) {
- info.mapDeficit +=
- (info.mapFairShare - info.runningMaps) * timeDelta;
- info.reduceDeficit +=
- (info.reduceFairShare - info.runningReduces) * timeDelta;
+ public List<PoolSchedulable> getPoolSchedulables(TaskType type) {
+ List<PoolSchedulable> scheds = new ArrayList<PoolSchedulable>();
+ for (Pool pool: poolMgr.getPools()) {
+ scheds.add(pool.getSchedulable(type));
}
+ return scheds;
}
private void updateRunnability() {
@@ -380,301 +715,45 @@ public class FairScheduler extends TaskS
Map<String, Integer> userJobs = new HashMap<String, Integer>();
Map<String, Integer> poolJobs = new HashMap<String, Integer>();
for (JobInProgress job: jobs) {
- if (job.getStatus().getRunState() == JobStatus.RUNNING) {
- String user = job.getJobConf().getUser();
- String pool = poolMgr.getPoolName(job);
- int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
- int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
- if (userCount < poolMgr.getUserMaxJobs(user) &&
- poolCount < poolMgr.getPoolMaxJobs(pool)) {
- infos.get(job).runnable = true;
+ String user = job.getJobConf().getUser();
+ String pool = poolMgr.getPoolName(job);
+ int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+ int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+ if (userCount < poolMgr.getUserMaxJobs(user) &&
+ poolCount < poolMgr.getPoolMaxJobs(pool)) {
+ if (job.getStatus().getRunState() == JobStatus.RUNNING ||
+ job.getStatus().getRunState() == JobStatus.PREP) {
userJobs.put(user, userCount + 1);
poolJobs.put(pool, poolCount + 1);
- }
- }
- }
- }
-
- private void updateTaskCounts() {
- for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
- JobInProgress job = entry.getKey();
- JobInfo info = entry.getValue();
- if (job.getStatus().getRunState() != JobStatus.RUNNING)
- continue; // Job is still in PREP state and tasks aren't initialized
- // Count maps
- int totalMaps = job.numMapTasks;
- int finishedMaps = 0;
- int runningMaps = 0;
- for (TaskInProgress tip :
- job.getTasks(org.apache.hadoop.mapreduce.TaskType.MAP)) {
- if (tip.isComplete()) {
- finishedMaps += 1;
- } else if (tip.isRunning()) {
- runningMaps += tip.getActiveTasks().size();
- }
- }
- info.runningMaps = runningMaps;
- info.neededMaps = (totalMaps - runningMaps - finishedMaps
- + taskSelector.neededSpeculativeMaps(job));
- // Count reduces
- int totalReduces = job.numReduceTasks;
- int finishedReduces = 0;
- int runningReduces = 0;
- for (TaskInProgress tip :
- job.getTasks(org.apache.hadoop.mapreduce.TaskType.REDUCE)) {
- if (tip.isComplete()) {
- finishedReduces += 1;
- } else if (tip.isRunning()) {
- runningReduces += tip.getActiveTasks().size();
- }
- }
- info.runningReduces = runningReduces;
- if (enoughMapsFinishedToRunReduces(finishedMaps, totalMaps)) {
- info.neededReduces = (totalReduces - runningReduces - finishedReduces
- + taskSelector.neededSpeculativeReduces(job));
- } else {
- info.neededReduces = 0;
- }
- // If the job was marked as not runnable due to its user or pool having
- // too many active jobs, set the neededMaps/neededReduces to 0. We still
- // count runningMaps/runningReduces however so we can give it a deficit.
- if (!info.runnable) {
- info.neededMaps = 0;
- info.neededReduces = 0;
- }
- }
- }
-
- /**
- * Has a job finished enough maps to allow launching its reduces?
- */
- protected boolean enoughMapsFinishedToRunReduces(
- int finishedMaps, int totalMaps) {
- if (waitForMapsBeforeLaunchingReduces) {
- return finishedMaps >= Math.max(1, totalMaps * 0.05);
- } else {
- return true;
- }
- }
-
- private void updateWeights() {
- // First, calculate raw weights for each job
- for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
- JobInProgress job = entry.getKey();
- JobInfo info = entry.getValue();
- info.mapWeight = calculateRawWeight(job, TaskType.MAP);
- info.reduceWeight = calculateRawWeight(job, TaskType.REDUCE);
- }
- // Now calculate job weight sums for each pool
- Map<String, Double> mapWeightSums = new HashMap<String, Double>();
- Map<String, Double> reduceWeightSums = new HashMap<String, Double>();
- for (Pool pool: poolMgr.getPools()) {
- double mapWeightSum = 0;
- double reduceWeightSum = 0;
- for (JobInProgress job: pool.getJobs()) {
- if (isRunnable(job)) {
- if (runnableTasks(job, TaskType.MAP) > 0) {
- mapWeightSum += infos.get(job).mapWeight;
- }
- if (runnableTasks(job, TaskType.REDUCE) > 0) {
- reduceWeightSum += infos.get(job).reduceWeight;
- }
- }
- }
- mapWeightSums.put(pool.getName(), mapWeightSum);
- reduceWeightSums.put(pool.getName(), reduceWeightSum);
- }
- // And normalize the weights based on pool sums and pool weights
- // to share fairly across pools (proportional to their weights)
- for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
- JobInProgress job = entry.getKey();
- JobInfo info = entry.getValue();
- String pool = poolMgr.getPoolName(job);
- double poolWeight = poolMgr.getPoolWeight(pool);
- double mapWeightSum = mapWeightSums.get(pool);
- double reduceWeightSum = reduceWeightSums.get(pool);
- if (mapWeightSum == 0)
- info.mapWeight = 0;
- else
- info.mapWeight *= (poolWeight / mapWeightSum);
- if (reduceWeightSum == 0)
- info.reduceWeight = 0;
- else
- info.reduceWeight *= (poolWeight / reduceWeightSum);
- }
- }
-
- private void updateMinSlots() {
- // Clear old minSlots
- for (JobInfo info: infos.values()) {
- info.minMaps = 0;
- info.minReduces = 0;
- }
- // For each pool, distribute its task allocation among jobs in it that need
- // slots. This is a little tricky since some jobs in the pool might not be
- // able to use all the slots, e.g. they might have only a few tasks left.
- // To deal with this, we repeatedly split up the available task slots
- // between the jobs left, give each job min(its alloc, # of slots it needs),
- // and redistribute any slots that are left over between jobs that still
- // need slots on the next pass. If, in total, the jobs in our pool don't
- // need all its allocation, we leave the leftover slots for general use.
- PoolManager poolMgr = getPoolManager();
- for (Pool pool: poolMgr.getPools()) {
- for (final TaskType type: TaskType.values()) {
- Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
- int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
- // Keep assigning slots until none are left
- while (slotsLeft > 0) {
- // Figure out total weight of jobs that still need slots
- double totalWeight = 0;
- for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
- JobInProgress job = it.next();
- if (isRunnable(job) &&
- runnableTasks(job, type) > minTasks(job, type)) {
- totalWeight += weight(job, type);
- } else {
- it.remove();
+ JobInfo jobInfo = infos.get(job);
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ jobInfo.runnable = true;
+ } else {
+ // The job is in the PREP state. Give it to the job initializer
+ // for initialization if we have not already done it.
+ if (jobInfo.needsInitializing) {
+ jobInfo.needsInitializing = false;
+ jobInitializer.initJob(jobInfo, job);
}
}
- if (totalWeight == 0) // No jobs that can use more slots are left
- break;
- // Assign slots to jobs, using the floor of their weight divided by
- // total weight. This ensures that all jobs get some chance to take
- // a slot. Then, if no slots were assigned this way, we do another
- // pass where we use ceil, in case some slots were still left over.
- int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
- for (JobInProgress job: jobs) {
- double weight = weight(job, type);
- int share = (int) Math.floor(oldSlots * weight / totalWeight);
- slotsLeft = giveMinSlots(job, type, slotsLeft, share);
- }
- if (slotsLeft == oldSlots) {
- // No tasks were assigned; do another pass using ceil, giving the
- // extra slots to jobs in order of weight then deficit
- List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
- Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
- public int compare(JobInProgress j1, JobInProgress j2) {
- double dif = weight(j2, type) - weight(j1, type);
- if (dif == 0) // Weights are equal, compare by deficit
- dif = deficit(j2, type) - deficit(j1, type);
- return (int) Math.signum(dif);
- }
- });
- for (JobInProgress job: sortedJobs) {
- double weight = weight(job, type);
- int share = (int) Math.ceil(oldSlots * weight / totalWeight);
- slotsLeft = giveMinSlots(job, type, slotsLeft, share);
- }
- if (slotsLeft > 0) {
- LOG.warn("Had slotsLeft = " + slotsLeft + " after the final "
- + "loop in updateMinSlots. This probably means some fair "
- + "scheduler weights are being set to NaN or Infinity.");
- }
- break;
- }
- }
- }
- }
- }
-
- /**
- * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
- * if either the job needs fewer slots or there aren't enough slots left).
- * Returns the number of slots left over.
- */
- private int giveMinSlots(JobInProgress job, TaskType type,
- int slotsLeft, int slotsToGive) {
- int runnable = runnableTasks(job, type);
- int curMin = minTasks(job, type);
- slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
- slotsLeft -= slotsToGive;
- JobInfo info = infos.get(job);
- if (type == TaskType.MAP)
- info.minMaps += slotsToGive;
- else
- info.minReduces += slotsToGive;
- return slotsLeft;
- }
-
- private void updateFairShares(ClusterStatus clusterStatus) {
- // Clear old fairShares
- for (JobInfo info: infos.values()) {
- info.mapFairShare = 0;
- info.reduceFairShare = 0;
- }
- // Assign new shares, based on weight and minimum share. This is done
- // as follows. First, we split up the available slots between all
- // jobs according to weight. Then if there are any jobs whose minSlots is
- // larger than their fair allocation, we give them their minSlots and
- // remove them from the list, and start again with the amount of slots
- // left over. This continues until all jobs' minSlots are less than their
- // fair allocation, and at this point we know that we've met everyone's
- // guarantee and we've split the excess capacity fairly among jobs left.
- for (TaskType type: TaskType.values()) {
- // Select only jobs that still need this type of task
- HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
- for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
- JobInProgress job = entry.getKey();
- JobInfo info = entry.getValue();
- if (isRunnable(job) && runnableTasks(job, type) > 0) {
- jobsLeft.add(info);
- }
- }
- double slotsLeft = getTotalSlots(type, clusterStatus);
- while (!jobsLeft.isEmpty()) {
- double totalWeight = 0;
- for (JobInfo info: jobsLeft) {
- double weight = (type == TaskType.MAP ?
- info.mapWeight : info.reduceWeight);
- totalWeight += weight;
- }
- boolean recomputeSlots = false;
- double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
- for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
- JobInfo info = iter.next();
- double minSlots = (type == TaskType.MAP ?
- info.minMaps : info.minReduces);
- double weight = (type == TaskType.MAP ?
- info.mapWeight : info.reduceWeight);
- double fairShare = weight / totalWeight * oldSlots;
- if (minSlots > fairShare) {
- // Job needs more slots than its fair share; give it its minSlots,
- // remove it from the list, and set recomputeSlots = true to
- // remember that we must loop again to redistribute unassigned slots
- if (type == TaskType.MAP)
- info.mapFairShare = minSlots;
- else
- info.reduceFairShare = minSlots;
- slotsLeft -= minSlots;
- iter.remove();
- recomputeSlots = true;
- }
- }
- if (!recomputeSlots) {
- // All minimums are met. Give each job its fair share of excess slots.
- for (JobInfo info: jobsLeft) {
- double weight = (type == TaskType.MAP ?
- info.mapWeight : info.reduceWeight);
- double fairShare = weight / totalWeight * oldSlots;
- if (type == TaskType.MAP)
- info.mapFairShare = fairShare;
- else
- info.reduceFairShare = fairShare;
- }
- break;
}
}
}
}
- private double calculateRawWeight(JobInProgress job, TaskType taskType) {
+ public double getJobWeight(JobInProgress job, TaskType taskType) {
if (!isRunnable(job)) {
- return 0;
+ // Job won't launch tasks, but don't return 0 to avoid division errors
+ return 1.0;
} else {
double weight = 1.0;
if (sizeBasedWeight) {
// Set weight based on runnable tasks
- weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+ JobInfo info = infos.get(job);
+ int runnableTasks = (taskType == TaskType.MAP) ?
+ info.mapSchedulable.getDemand() :
+ info.reduceSchedulable.getDemand();
+ weight = Math.log1p(runnableTasks) / Math.log(2);
}
weight *= getPriorityFactor(job.getPriority());
if (weightAdjuster != null) {
@@ -704,49 +783,220 @@ public class FairScheduler extends TaskS
clusterStatus.getMaxMapTasks() : clusterStatus.getMaxReduceTasks());
}
- public synchronized boolean getUseFifo() {
- return useFifo;
- }
-
- public synchronized void setUseFifo(boolean useFifo) {
- this.useFifo = useFifo;
+ /**
+ * Update the preemption fields for all PoolScheduables, i.e. the times since
+ * each pool last was at its guaranteed share and at > 1/2 of its fair share
+ * for each type of task.
+ */
+ private void updatePreemptionVariables() {
+ long now = clock.getTime();
+ lastPreemptionUpdateTime = now;
+ for (TaskType type: MAP_AND_REDUCE) {
+ for (PoolSchedulable sched: getPoolSchedulables(type)) {
+ if (!isStarvedForMinShare(sched)) {
+ sched.setLastTimeAtMinShare(now);
+ }
+ if (!isStarvedForFairShare(sched)) {
+ sched.setLastTimeAtHalfFairShare(now);
+ }
+ eventLog.log("PREEMPT_VARS", sched.getName(), type,
+ now - sched.getLastTimeAtMinShare(),
+ now - sched.getLastTimeAtHalfFairShare());
+ }
+ }
}
-
- // Getter methods for reading JobInfo values based on TaskType, safely
- // returning 0's for jobs with no JobInfo present.
- protected int neededTasks(JobInProgress job, TaskType taskType) {
- JobInfo info = infos.get(job);
- if (info == null) return 0;
- return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
+ /**
+ * Is a pool below its min share for the given task type?
+ */
+ boolean isStarvedForMinShare(PoolSchedulable sched) {
+ int desiredShare = Math.min(sched.getMinShare(), sched.getDemand());
+ return (sched.getRunningTasks() < desiredShare);
}
- protected int runningTasks(JobInProgress job, TaskType taskType) {
- JobInfo info = infos.get(job);
- if (info == null) return 0;
- return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
- }
-
- protected int runnableTasks(JobInProgress job, TaskType type) {
- return neededTasks(job, type) + runningTasks(job, type);
+ /**
+ * Is a pool being starved for fair share for the given task type?
+ * This is defined as being below half its fair share.
+ */
+ boolean isStarvedForFairShare(PoolSchedulable sched) {
+ int desiredFairShare = (int) Math.floor(Math.min(
+ sched.getFairShare() / 2, sched.getDemand()));
+ return (sched.getRunningTasks() < desiredFairShare);
}
- protected int minTasks(JobInProgress job, TaskType type) {
- JobInfo info = infos.get(job);
- if (info == null) return 0;
- return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
+ /**
+ * Check for pools that need tasks preempted, either because they have been
+ * below their guaranteed share for minSharePreemptionTimeout or they
+ * have been below half their fair share for the fairSharePreemptionTimeout.
+ * If such pools exist, compute how many tasks of each type need to be
+ * preempted and then select the right ones using preemptTasks.
+ *
+ * This method computes and logs the number of tasks we want to preempt even
+ * if preemption is disabled, for debugging purposes.
+ */
+ protected void preemptTasksIfNecessary() {
+ if (!preemptionEnabled)
+ return;
+
+ long curTime = clock.getTime();
+ if (curTime - lastPreemptCheckTime < preemptionInterval)
+ return;
+ lastPreemptCheckTime = curTime;
+
+ // Acquire locks on both the JobTracker (task tracker manager) and this
+ // because we might need to call some JobTracker methods (killTask).
+ synchronized (taskTrackerManager) {
+ synchronized (this) {
+ for (TaskType type: MAP_AND_REDUCE) {
+ List<PoolSchedulable> scheds = getPoolSchedulables(type);
+ int tasksToPreempt = 0;
+ for (PoolSchedulable sched: scheds) {
+ tasksToPreempt += tasksToPreempt(sched, curTime);
+ }
+ if (tasksToPreempt > 0) {
+ eventLog.log("SHOULD_PREEMPT", type, tasksToPreempt);
+ if (!onlyLogPreemption) {
+ preemptTasks(scheds, tasksToPreempt);
+ }
+ }
+ }
+ }
+ }
}
- protected double weight(JobInProgress job, TaskType taskType) {
- JobInfo info = infos.get(job);
- if (info == null) return 0;
- return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
+ /**
+ * Preempt a given number of tasks from a list of PoolSchedulables.
+ * The policy for this is to pick tasks from pools that are over their fair
+ * share, but make sure that no pool is placed below its fair share in the
+ * process. Furthermore, we want to minimize the amount of computation
+ * wasted by preemption, so out of the tasks in over-scheduled pools, we
+ * prefer to preempt tasks that started most recently.
+ */
+ private void preemptTasks(List<PoolSchedulable> scheds, int tasksToPreempt) {
+ if (scheds.isEmpty() || tasksToPreempt == 0)
+ return;
+
+ TaskType taskType = scheds.get(0).getTaskType();
+
+ // Collect running tasks of our type from over-scheduled pools
+ List<TaskStatus> runningTasks = new ArrayList<TaskStatus>();
+ for (PoolSchedulable sched: scheds) {
+ if (sched.getRunningTasks() > sched.getFairShare())
+ for (JobSchedulable js: sched.getJobSchedulables()) {
+ runningTasks.addAll(getRunningTasks(js.getJob(), taskType));
+ }
+ }
+
+ // Sort tasks into reverse order of start time
+ Collections.sort(runningTasks, new Comparator<TaskStatus>() {
+ public int compare(TaskStatus t1, TaskStatus t2) {
+ if (t1.getStartTime() < t2.getStartTime())
+ return 1;
+ else if (t1.getStartTime() == t2.getStartTime())
+ return 0;
+ else
+ return -1;
+ }
+ });
+
+ // Maintain a count of tasks left in each pool; this is a bit
+ // faster than calling runningTasks() on the pool repeatedly
+ // because the latter must scan through jobs in the pool
+ HashMap<Pool, Integer> tasksLeft = new HashMap<Pool, Integer>();
+ for (Pool p: poolMgr.getPools()) {
+ tasksLeft.put(p, p.getSchedulable(taskType).getRunningTasks());
+ }
+
+ // Scan down the sorted list of task statuses until we've killed enough
+ // tasks, making sure we don't kill too many from any pool
+ for (TaskStatus status: runningTasks) {
+ JobID jobID = status.getTaskID().getJobID();
+ JobInProgress job = taskTrackerManager.getJob(jobID);
+ Pool pool = poolMgr.getPool(job);
+ PoolSchedulable sched = pool.getSchedulable(taskType);
+ int tasksLeftForPool = tasksLeft.get(pool);
+ if (tasksLeftForPool > sched.getFairShare()) {
+ eventLog.log("PREEMPT", status.getTaskID(),
+ status.getTaskTracker());
+ try {
+ taskTrackerManager.killTask(status.getTaskID(), false);
+ tasksToPreempt--;
+ if (tasksToPreempt == 0)
+ break;
+
+ // reduce tasks left for pool
+ tasksLeft.put(pool, --tasksLeftForPool);
+ } catch (IOException e) {
+ LOG.error("Failed to kill task " + status.getTaskID(), e);
+ }
+ }
+ }
}
- protected double deficit(JobInProgress job, TaskType taskType) {
- JobInfo info = infos.get(job);
- if (info == null) return 0;
- return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+ /**
+ * Count how many tasks of a given type the pool needs to preempt, if any.
+ * If the pool has been below its min share for at least its preemption
+ * timeout, it should preempt the difference between its current share and
+ * this min share. If it has been below half its fair share for at least the
+ * fairSharePreemptionTimeout, it should preempt enough tasks to get up to
+ * its full fair share. If both conditions hold, we preempt the max of the
+ * two amounts (this shouldn't happen unless someone sets the timeouts to
+ * be identical for some reason).
+ */
+ protected int tasksToPreempt(PoolSchedulable sched, long curTime) {
+ String pool = sched.getName();
+ long minShareTimeout = poolMgr.getMinSharePreemptionTimeout(pool);
+ long fairShareTimeout = poolMgr.getFairSharePreemptionTimeout();
+ int tasksDueToMinShare = 0;
+ int tasksDueToFairShare = 0;
+ if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
+ int target = Math.min(sched.getMinShare(), sched.getDemand());
+ tasksDueToMinShare = Math.max(0, target - sched.getRunningTasks());
+ }
+ if (curTime - sched.getLastTimeAtHalfFairShare() > fairShareTimeout) {
+ int target = (int) Math.min(sched.getFairShare(), sched.getDemand());
+ tasksDueToFairShare = Math.max(0, target - sched.getRunningTasks());
+ }
+ int tasksToPreempt = Math.max(tasksDueToMinShare, tasksDueToFairShare);
+ if (tasksToPreempt > 0) {
+ String message = "Should preempt " + tasksToPreempt + " "
+ + sched.getTaskType() + " tasks for pool " + sched.getName()
+ + ": tasksDueToMinShare = " + tasksDueToMinShare
+ + ", tasksDueToFairShare = " + tasksDueToFairShare;
+ eventLog.log("INFO", message);
+ LOG.info(message);
+ }
+ return tasksToPreempt;
+ }
+
+ private List<TaskStatus> getRunningTasks(JobInProgress job, TaskType type) {
+ // Create a list of all running TaskInProgress'es in the job
+ Set<TaskInProgress> tips = new HashSet<TaskInProgress>();
+ if (type == TaskType.MAP) {
+ // Jobs may have both "non-local maps" which have a split with no locality
+ // info (e.g. the input file is not in HDFS), and maps with locality info,
+ // which are stored in the runningMapCache map from location to task list
+ tips.addAll(job.nonLocalRunningMaps);
+ for (Set<TaskInProgress> set: job.runningMapCache.values()) {
+ tips.addAll(set);
+ }
+ }
+ else {
+ tips.addAll(job.runningReduces);
+ }
+ // Get the active TaskStatus'es for each TaskInProgress (there may be
+ // more than one if the task has multiple copies active due to speculation)
+ List<TaskStatus> statuses = new ArrayList<TaskStatus>();
+ for (TaskInProgress tip: tips) {
+ for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
+ TaskStatus stat = tip.getTaskStatus(id);
+ // status is null when the task has been scheduled but not yet running
+ if (stat != null) {
+ statuses.add(stat);
+ }
+ }
+ }
+ return statuses;
}
protected boolean isRunnable(JobInProgress job) {
@@ -760,4 +1010,92 @@ public class FairScheduler extends TaskS
Pool myJobPool = poolMgr.getPool(queueName);
return myJobPool.getJobs();
}
+
+ protected void dumpIfNecessary() {
+ long now = clock.getTime();
+ long timeDelta = now - lastDumpTime;
+ if (timeDelta > dumpInterval && eventLog.isEnabled()) {
+ dump();
+ lastDumpTime = now;
+ }
+ }
+
+ /**
+ * Dump scheduler state to the fairscheduler log.
+ */
+ private synchronized void dump() {
+ synchronized (eventLog) {
+ eventLog.log("BEGIN_DUMP");
+ // List jobs in order of submit time
+ ArrayList<JobInProgress> jobs =
+ new ArrayList<JobInProgress>(infos.keySet());
+ Collections.sort(jobs, new Comparator<JobInProgress>() {
+ public int compare(JobInProgress j1, JobInProgress j2) {
+ return (int) Math.signum(j1.getStartTime() - j2.getStartTime());
+ }
+ });
+ // Dump info for each job
+ for (JobInProgress job: jobs) {
+ JobProfile profile = job.getProfile();
+ JobInfo info = infos.get(job);
+ Schedulable ms = info.mapSchedulable;
+ Schedulable rs = info.reduceSchedulable;
+ eventLog.log("JOB",
+ profile.getJobID(), profile.name, profile.user,
+ job.getPriority(), poolMgr.getPoolName(job),
+ job.numMapTasks, ms.getRunningTasks(),
+ ms.getDemand(), ms.getFairShare(), ms.getWeight(),
+ job.numReduceTasks, rs.getRunningTasks(),
+ rs.getDemand(), rs.getFairShare(), rs.getWeight());
+ }
+ // List pools in alphabetical order
+ List<Pool> pools = new ArrayList<Pool>(poolMgr.getPools());
+ Collections.sort(pools, new Comparator<Pool>() {
+ public int compare(Pool p1, Pool p2) {
+ if (p1.isDefaultPool())
+ return 1;
+ else if (p2.isDefaultPool())
+ return -1;
+ else return p1.getName().compareTo(p2.getName());
+ }});
+ for (Pool pool: pools) {
+ int runningMaps = 0;
+ int runningReduces = 0;
+ for (JobInProgress job: pool.getJobs()) {
+ JobInfo info = infos.get(job);
+ if (info != null) {
+ // TODO: Fix
+ //runningMaps += info.runningMaps;
+ //runningReduces += info.runningReduces;
+ }
+ }
+ String name = pool.getName();
+ eventLog.log("POOL",
+ name, poolMgr.getPoolWeight(name), pool.getJobs().size(),
+ poolMgr.getAllocation(name, TaskType.MAP), runningMaps,
+ poolMgr.getAllocation(name, TaskType.REDUCE), runningReduces);
+ }
+ // Dump info for each pool
+ eventLog.log("END_DUMP");
+ }
+ }
+
+ public Clock getClock() {
+ return clock;
+ }
+
+ public FairSchedulerEventLog getEventLog() {
+ return eventLog;
+ }
+
+ public JobInfo getJobInfo(JobInProgress job) {
+ return infos.get(job);
+ }
+
+ boolean isPreemptionEnabled() {
+ return preemptionEnabled;
+ }
+ long getLastPreemptionUpdateTime() {
+ return lastPreemptionUpdateTime;
+ }
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java?rev=1169585&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerEventLog.java Sun Sep 11 23:57:37 2011
@@ -0,0 +1,142 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Event log used by the fair scheduler for machine-readable debug info.
+ * This class uses a log4j rolling file appender to write the log, but uses
+ * a custom tab-separated event format of the form:
+ * <pre>
+ * DATE EVENT_TYPE PARAM_1 PARAM_2 ...
+ * </pre>
+ * Various event types are used by the fair scheduler. The purpose of logging
+ * in this format is to enable tools to parse the history log easily and read
+ * internal scheduler variables, rather than trying to make the log human
+ * readable. The fair scheduler also logs human readable messages in the
+ * JobTracker's main log.
+ *
+ * Constructing this class creates a disabled log. It must be initialized
+ * using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
+ * writing to the file.
+ */
+class FairSchedulerEventLog {
+ private static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.mapred.FairSchedulerEventLog");
+
+ /** Set to true if logging is disabled due to an error. */
+ private boolean logDisabled = true;
+
+ /**
+ * Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
+ * defaults to {hadoop.log.dir}/fairscheduler.
+ */
+ private String logDir;
+
+ /**
+ * Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.{host}.log.
+ * Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
+ */
+ private String logFile;
+
+ /** Log4j appender used to write to the log file */
+ private DailyRollingFileAppender appender;
+
+ boolean init(Configuration conf, String jobtrackerHostname) {
+ try {
+ logDir = conf.get("mapred.fairscheduler.eventlog.location",
+ new File(System.getProperty("hadoop.log.dir")).getAbsolutePath()
+ + File.separator + "fairscheduler");
+ Path logDirPath = new Path(logDir);
+ FileSystem fs = logDirPath.getFileSystem(conf);
+ if (!fs.exists(logDirPath)) {
+ if (!fs.mkdirs(logDirPath)) {
+ throw new IOException(
+ "Mkdirs failed to create " + logDirPath.toString());
+ }
+ }
+ String username = System.getProperty("user.name");
+ logFile = String.format("%s%shadoop-%s-fairscheduler-%s.log",
+ logDir, File.separator, username, jobtrackerHostname);
+ logDisabled = false;
+ PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
+ appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
+ appender.activateOptions();
+ LOG.info("Initialized fair scheduler event log, logging to " + logFile);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to initialize fair scheduler event log. Disabling it.", e);
+ logDisabled = true;
+ }
+ return !(logDisabled);
+ }
+
+ /**
+ * Log an event, writing a line in the log file of the form
+ * <pre>
+ * DATE EVENT_TYPE PARAM_1 PARAM_2 ...
+ * </pre>
+ */
+ synchronized void log(String eventType, Object... params) {
+ try {
+ if (logDisabled)
+ return;
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(eventType);
+ for (Object param: params) {
+ buffer.append("\t");
+ buffer.append(param);
+ }
+ String message = buffer.toString();
+ Logger logger = Logger.getLogger(getClass());
+ appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
+ } catch (Exception e) {
+ LOG.error("Failed to append to fair scheduler event log", e);
+ logDisabled = true;
+ }
+ }
+
+ /**
+ * Flush and close the log.
+ */
+ void shutdown() {
+ try {
+ if (appender != null)
+ appender.close();
+ } catch (Exception e) {}
+ logDisabled = true;
+ }
+
+ boolean isEnabled() {
+ return !logDisabled;
+ }
+}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Sun Sep 11 23:57:37 2011
@@ -30,6 +30,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
+import java.util.Iterator;
import java.util.List;
import javax.servlet.ServletContext;
@@ -39,16 +40,15 @@ import javax.servlet.http.HttpServletReq
import javax.servlet.http.HttpServletResponse;
import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.mapreduce.TaskType;
import org.apache.hadoop.util.StringUtils;
/**
* Servlet for displaying fair scheduler information, installed at
* [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
*
- * The main features are viewing each job's task count and fair share, ability
- * to change job priorities and pools from the UI, and ability to switch the
- * scheduler to FIFO mode without restarting the JobTracker if this is required
- * for any reason.
+ * The main features are viewing each job's task count and fair share,
+ * and admin controls to change job priorities and pools from the UI.
*
* There is also an "advanced" view for debugging that can be turned on by
* going to [job tracker URL]/scheduler?advanced.
@@ -82,13 +82,9 @@ public class FairSchedulerServlet extend
// If the request has a set* param, handle that and redirect to the regular
// view page so that the user won't resubmit the data if they hit refresh.
boolean advancedView = request.getParameter("advanced") != null;
- if (request.getParameter("setFifo") != null) {
- scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
- response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
- return;
- }
- if (request.getParameter("setPool") != null) {
- Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ if (JSPUtil.privateActionsAllowed(jobTracker.conf)
+ && request.getParameter("setPool") != null) {
+ Collection<JobInProgress> runningJobs = getInitedJobs();
PoolManager poolMgr = null;
synchronized (scheduler) {
poolMgr = scheduler.getPoolManager();
@@ -107,8 +103,9 @@ public class FairSchedulerServlet extend
response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
return;
}
- if (request.getParameter("setPriority") != null) {
- Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ if (JSPUtil.privateActionsAllowed(jobTracker.conf)
+ && request.getParameter("setPriority") != null) {
+ Collection<JobInProgress> runningJobs = getInitedJobs();
JobPriority priority = JobPriority.valueOf(request.getParameter(
"setPriority"));
String jobId = request.getParameter("jobid");
@@ -126,22 +123,21 @@ public class FairSchedulerServlet extend
response.setContentType("text/html");
// Because the client may read arbitrarily slow, and we hold locks while
- // the servlet output, we want to write to our own buffer which we know
+ // the servlet outputs, we want to write to our own buffer which we know
// won't block.
ByteArrayOutputStream baos = new ByteArrayOutputStream();
PrintWriter out = new PrintWriter(baos);
String hostname = StringUtils.simpleHostname(
jobTracker.getJobTrackerMachine());
out.print("<html><head>");
- out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+ out.printf("<title>%s Fair Scheduler Administration</title>\n", hostname);
out.print("<link rel=\"stylesheet\" type=\"text/css\" " +
"href=\"/static/hadoop.css\">\n");
out.print("</head><body>\n");
out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " +
- "Job Scheduler Administration</h1>\n", hostname);
+ "Fair Scheduler Administration</h1>\n", hostname);
showPools(out, advancedView);
showJobs(out, advancedView);
- showAdminForm(out, advancedView);
out.print("</body></html>\n");
out.close();
@@ -156,12 +152,17 @@ public class FairSchedulerServlet extend
*/
private void showPools(PrintWriter out, boolean advancedView) {
synchronized(scheduler) {
+ boolean warnInverted = false;
PoolManager poolManager = scheduler.getPoolManager();
out.print("<h2>Pools</h2>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
- out.print("<tr><th>Pool</th><th>Running Jobs</th>" +
- "<th>Min Maps</th><th>Min Reduces</th>" +
- "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+ out.print("<tr><th rowspan=2>Pool</th>" +
+ "<th rowspan=2>Running Jobs</th>" +
+ "<th colspan=4>Map Tasks</th>" +
+ "<th colspan=4>Reduce Tasks</th>" +
+ "<th rowspan=2>Scheduling Mode</th></tr>\n<tr>" +
+ "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th>" +
+ "<th>Min Share</th><th>Max Share</th><th>Running</th><th>Fair Share</th></tr>\n");
List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
Collections.sort(pools, new Comparator<Pool>() {
public int compare(Pool p1, Pool p2) {
@@ -172,27 +173,54 @@ public class FairSchedulerServlet extend
else return p1.getName().compareTo(p2.getName());
}});
for (Pool pool: pools) {
- int runningMaps = 0;
- int runningReduces = 0;
- for (JobInProgress job: pool.getJobs()) {
- JobInfo info = scheduler.infos.get(job);
- if (info != null) {
- runningMaps += info.runningMaps;
- runningReduces += info.runningReduces;
- }
- }
- out.print("<tr>\n");
- out.printf("<td>%s</td>\n", pool.getName());
- out.printf("<td>%s</td>\n", pool.getJobs().size());
- out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ String name = pool.getName();
+ int runningMaps = pool.getMapSchedulable().getRunningTasks();
+ int runningReduces = pool.getReduceSchedulable().getRunningTasks();
+ int maxMaps = poolManager.getMaxSlots(name, TaskType.MAP);
+ int maxReduces = poolManager.getMaxSlots(name, TaskType.REDUCE);
+ boolean invertedMaps = poolManager.invertedMinMax(TaskType.MAP, name);
+ boolean invertedReduces = poolManager.invertedMinMax(TaskType.REDUCE, name);
+ warnInverted = warnInverted || invertedMaps || invertedReduces;
+ out.print("<tr>");
+ out.printf("<td>%s</td>", name);
+ out.printf("<td>%d</td>", pool.getJobs().size());
+ // Map Tasks
+ out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.MAP));
- out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ out.print("<td>");
+ if(maxMaps == Integer.MAX_VALUE) {
+ out.print("-");
+ } else {
+ out.print(maxMaps);
+ }
+ if(invertedMaps) {
+ out.print("*");
+ }
+ out.print("</td>");
+ out.printf("<td>%d</td>", runningMaps);
+ out.printf("<td>%.1f</td>", pool.getMapSchedulable().getFairShare());
+ // Reduce Tasks
+ out.printf("<td>%d</td>", poolManager.getAllocation(name,
TaskType.REDUCE));
- out.printf("<td>%s</td>\n", runningMaps);
- out.printf("<td>%s</td>\n", runningReduces);
+ out.print("<td>");
+ if(maxReduces == Integer.MAX_VALUE) {
+ out.print("-");
+ } else {
+ out.print(maxReduces);
+ }
+ if(invertedReduces) {
+ out.print("*");
+ }
+ out.print("</td>");
+ out.printf("<td>%d</td>", runningReduces);
+ out.printf("<td>%.1f</td>", pool.getReduceSchedulable().getFairShare());
+ out.printf("<td>%s</td>", pool.getSchedulingMode());
out.print("</tr>\n");
}
out.print("</table>\n");
+ if(warnInverted) {
+ out.print("<p>* One or more pools have max share set lower than min share. Max share will be used and minimum will be treated as if set equal to max.</p>");
+ }
}
}
@@ -202,66 +230,70 @@ public class FairSchedulerServlet extend
private void showJobs(PrintWriter out, boolean advancedView) {
out.print("<h2>Running Jobs</h2>\n");
out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
- int colsPerTaskType = advancedView ? 6 : 3;
+ int colsPerTaskType = advancedView ? 4 : 3;
out.printf("<tr><th rowspan=2>Submitted</th>" +
"<th rowspan=2>JobID</th>" +
"<th rowspan=2>User</th>" +
"<th rowspan=2>Name</th>" +
"<th rowspan=2>Pool</th>" +
"<th rowspan=2>Priority</th>" +
- "<th colspan=%d>Maps</th>" +
- "<th colspan=%d>Reduces</th>",
+ "<th colspan=%d>Map Tasks</th>" +
+ "<th colspan=%d>Reduce Tasks</th>",
colsPerTaskType, colsPerTaskType);
out.print("</tr><tr>\n");
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
- (advancedView ? "<th>Weight</th><th>Deficit</th><th>minMaps</th>" : ""));
+ (advancedView ? "<th>Weight</th>" : ""));
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
- (advancedView ? "<th>Weight</th><th>Deficit</th><th>minReduces</th>" : ""));
+ (advancedView ? "<th>Weight</th>" : ""));
out.print("</tr>\n");
synchronized (jobTracker) {
- Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ Collection<JobInProgress> runningJobs = getInitedJobs();
synchronized (scheduler) {
for (JobInProgress job: runningJobs) {
JobProfile profile = job.getProfile();
JobInfo info = scheduler.infos.get(job);
if (info == null) { // Job finished, but let's show 0's for info
- info = new JobInfo();
+ info = new JobInfo(null, null);
}
out.print("<tr>\n");
out.printf("<td>%s</td>\n", DATE_FORMAT.format(
- new Date(job.getStartTime())));
+ new Date(job.getStartTime())));
out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
- profile.getJobID(), profile.getJobID());
+ profile.getJobID(), profile.getJobID());
out.printf("<td>%s</td>\n", profile.getUser());
out.printf("<td>%s</td>\n", profile.getJobName());
- out.printf("<td>%s</td>\n", generateSelect(
- scheduler.getPoolManager().getPoolNames(),
- scheduler.getPoolManager().getPoolName(job),
- "/scheduler?setPool=<CHOICE>&jobid=" + profile.getJobID() +
- (advancedView ? "&advanced" : "")));
- out.printf("<td>%s</td>\n", generateSelect(
- Arrays.asList(new String[]
- {"VERY_LOW", "LOW", "NORMAL", "HIGH", "VERY_HIGH"}),
- job.getPriority().toString(),
- "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID() +
- (advancedView ? "&advanced" : "")));
- out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
- job.finishedMaps(), job.desiredMaps(), info.runningMaps,
- info.mapFairShare);
+ if (JSPUtil.privateActionsAllowed(jobTracker.conf)) {
+ out.printf("<td>%s</td>\n", generateSelect(scheduler
+ .getPoolManager().getPoolNames(), scheduler.getPoolManager()
+ .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
+ + profile.getJobID() + (advancedView ? "&advanced" : "")));
+ out.printf("<td>%s</td>\n", generateSelect(Arrays
+ .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
+ "VERY_HIGH" }), job.getPriority().toString(),
+ "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
+ + (advancedView ? "&advanced" : "")));
+ } else {
+ out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
+ out.printf("<td>%s</td>\n", job.getPriority().toString());
+ }
+ Pool pool = scheduler.getPoolManager().getPool(job);
+ String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedMaps(), job.desiredMaps(),
+ info.mapSchedulable.getRunningTasks(),
+ mapShare);
if (advancedView) {
- out.printf("<td>%8.1f</td>\n", info.mapWeight);
- out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
- (info.mapDeficit / 1000) + "s" : "--");
- out.printf("<td>%d</td>\n", info.minMaps);
+ out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
}
- out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
- job.finishedReduces(), job.desiredReduces(), info.runningReduces,
- info.reduceFairShare);
+ String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedReduces(), job.desiredReduces(),
+ info.reduceSchedulable.getRunningTasks(),
+ reduceShare);
if (advancedView) {
- out.printf("<td>%8.1f</td>\n", info.reduceWeight);
- out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
- (info.reduceDeficit / 1000) + "s" : "--");
- out.printf("<td>%d</td>\n", info.minReduces);
+ out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
}
out.print("</tr>\n");
}
@@ -294,22 +326,17 @@ public class FairSchedulerServlet extend
}
/**
- * Print the administration form at the bottom of the page, which currently
- * only includes the button for switching between FIFO and Fair Scheduling.
+ * Obtained all initialized jobs
*/
- private void showAdminForm(PrintWriter out, boolean advancedView) {
- out.print("<h2>Scheduling Mode</h2>\n");
- String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
- String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
- String advParam = advancedView ? "?advanced" : "";
- out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
- out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
- "<input type=\"submit\" value=\"Switch to %s mode.\" " +
- "onclick=\"return confirm('Are you sure you want to change " +
- "scheduling mode to %s?')\" />\n",
- curMode, otherMode, otherMode);
- out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
- !scheduler.getUseFifo());
- out.print("</form>\n");
+ private Collection<JobInProgress> getInitedJobs() {
+ Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ for (Iterator<JobInProgress> it = runningJobs.iterator(); it.hasNext();) {
+ JobInProgress job = it.next();
+ if (!job.inited()) {
+ it.remove();
+ }
+ }
+ return runningJobs;
}
+
}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=1169585&r1=1169584&r2=1169585&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Sun Sep 11 23:57:37 2011
@@ -35,7 +35,8 @@ public class FifoJobComparator implement
}
}
if (res == 0) {
- res = j1.hashCode() - j2.hashCode();
+ // If there is a tie, break it by job ID to get a deterministic order
+ res = j1.getJobID().compareTo(j2.getJobID());
}
return res;
}