You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by om...@apache.org on 2008/08/29 05:50:07 UTC
svn commit: r690093 [1/2] - in /hadoop/core/trunk: ./ src/contrib/
src/contrib/fairscheduler/ src/contrib/fairscheduler/src/
src/contrib/fairscheduler/src/java/ src/contrib/fairscheduler/src/java/org/
src/contrib/fairscheduler/src/java/org/apache/ src/...
Author: omalley
Date: Thu Aug 28 20:50:06 2008
New Revision: 690093
URL: http://svn.apache.org/viewvc?rev=690093&view=rev
Log:
HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)
Added:
hadoop/core/trunk/src/contrib/fairscheduler/
hadoop/core/trunk/src/contrib/fairscheduler/README
hadoop/core/trunk/src/contrib/fairscheduler/build.xml
hadoop/core/trunk/src/contrib/fairscheduler/src/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskSelector.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/TaskType.java
hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/WeightAdjuster.java
hadoop/core/trunk/src/contrib/fairscheduler/src/test/
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
hadoop/core/trunk/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairScheduler.java
Modified:
hadoop/core/trunk/CHANGES.txt
hadoop/core/trunk/src/contrib/build.xml
hadoop/core/trunk/src/mapred/org/apache/hadoop/mapred/TaskInProgress.java
Modified: hadoop/core/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/CHANGES.txt?rev=690093&r1=690092&r2=690093&view=diff
==============================================================================
--- hadoop/core/trunk/CHANGES.txt (original)
+++ hadoop/core/trunk/CHANGES.txt Thu Aug 28 20:50:06 2008
@@ -104,6 +104,8 @@
HADOOP-3759. Provides ability to run memory intensive jobs without
affecting other running tasks on the nodes. (Hemanth Yamijala via ddas)
+ HADOOP-3746. Add a fair share scheduler. (Matei Zaharia via omalley)
+
IMPROVEMENTS
HADOOP-3908. Fuse-dfs: better error message if llibhdfs.so doesn't exist.
Modified: hadoop/core/trunk/src/contrib/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/build.xml?rev=690093&r1=690092&r2=690093&view=diff
==============================================================================
--- hadoop/core/trunk/src/contrib/build.xml (original)
+++ hadoop/core/trunk/src/contrib/build.xml Thu Aug 28 20:50:06 2008
@@ -47,6 +47,7 @@
<target name="test">
<subant target="test">
<fileset dir="." includes="streaming/build.xml"/>
+ <fileset dir="." includes="fairscheduler/build.xml"/>
</subant>
</target>
Added: hadoop/core/trunk/src/contrib/fairscheduler/README
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/README?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/README (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/README Thu Aug 28 20:50:06 2008
@@ -0,0 +1,238 @@
+# Copyright 2008 The Apache Software Foundation Licensed under the
+# Apache License, Version 2.0 (the "License"); you may not use this
+# file except in compliance with the License. You may obtain a copy
+# of the License at http://www.apache.org/licenses/LICENSE-2.0 Unless
+# required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
+# implied. See the License for the specific language governing
+# permissions and limitations under the License.
+
+This package implements fair scheduling for MapReduce jobs.
+
+Fair scheduling is a method of assigning resources to jobs such that all jobs
+get, on average, an equal share of resources over time. When there is a single
+job running, that job uses the entire cluster. When other jobs are submitted,
+tasks slots that free up are assigned to the new jobs, so that each job gets
+roughly the same amount of CPU time. Unlike the default Hadoop scheduler, which
+forms a queue of jobs, this lets short jobs finish in reasonable time while not
+starving long jobs. It is also a reasonable way to share a cluster between a
+number of users. Finally, fair sharing can also work with job priorities - the
+priorities are used as weights to determine the fraction of total compute time
+that each job should get.
+
+In addition to providing fair sharing, the Fair Scheduler allows assigning
+jobs to "pools" with guaranteed minimum shares. When a pool contains jobs,
+it gets at least its minimum share, but when a pool does not need its full
+capacity, the excess is shared between other running jobs. Thus pools are
+a way to guarantee capacity for particular user groups while utilizing the
+cluster efficiently when these users are not submitting any jobs. Within each
+pool, fair sharing is used to share capacity between the running jobs. By
+default the pool is set based on the queue.name property in the jobconf which
+will be introduced with the Hadoop Resource Manager (JIRA 3445), but it's
+possible to also have a pool per user or per Unix user group.
+
+The fair scheduler lets all jobs run by default, but it is also possible to
+limit the number of running jobs per user and per pool through the config
+file. This can be useful when a user must submit hundreds of jobs at once,
+or in general to improve performance if running too many jobs at once would
+cause too much intermediate data to be created or too much context-switching.
+Limiting the jobs does not cause any subsequently submitted jobs to fail, only
+to wait in the sheduler's queue until some of the user's earlier jobs finish.
+Jobs to run from each user/pool are chosen in order of priority and then submit
+time, as in the default FIFO scheduler in Hadoop.
+
+Finally, the fair scheduler provides several extension points where the basic
+functionality can be extended. For example, the weight calculation can be
+modified to give a priority boost to new jobs, implementing a "shortest job
+first" like policy which will reduce response times for interactive jobs even
+further.
+
+--------------------------------------------------------------------------------
+
+BUILDING:
+
+In HADOOP_HOME, run ant package to build Hadoop and its contrib packages.
+
+--------------------------------------------------------------------------------
+
+INSTALLING:
+
+To run the fair scheduler in your Hadoop installation, you need to put it on
+the CLASSPATH. The easiest way is to copy the hadoop-*-fairscheduler.jar
+from HADOOP_HOME/build/contrib/fairscheduler to HADOOP_HOME/lib. Alternatively
+you can modify HADOOP_CLASSPATH to include this jar, in conf/hadoop-env.sh.
+
+You will also need to set the following property in the Hadoop config file
+(conf/hadoop-site.xml) to have Hadoop use the fair scheduler:
+
+<property>
+ <name>mapred.jobtracker.taskScheduler</name>
+ <value>org.apache.hadoop.mapred.FairScheduler</value>
+</property>
+
+Once you restart the cluster, you can check that the fair scheduler is running
+by going to http://<jobtracker URL>/scheduler on the JobTracker's web UI. A
+"job scheduler administration" page should be visible there. This page is
+described in the Administration section.
+
+--------------------------------------------------------------------------------
+
+CONFIGURING:
+
+The following properties can be set in hadoop-site.xml to configure the
+scheduler:
+
+mapred.fairscheduler.allocation.file:
+ Specifies an absolute path to an XML file which contains the allocations
+ for each pool, as well as the per-pool and per-user limits on number of
+ running jobs. If this property is not provided, allocations are not used.
+ This file must be in XML format, and can contain three types of elements:
+ - pool elements, which may contain elements for minMaps, minReduces and
+ maxRunningJobs (limit the number of jobs from the pool to run at once).
+ - user elements, which may contain a maxRunningJobs to limit jobs.
+ - A userMaxJobsDefault element, which sets the running job limit for any
+ users that do not have their own elements.
+ The following example file shows how to create each type of element:
+ <?xml version="1.0"?>
+ <allocations>
+ <pool name="sample_pool">
+ <minMaps>5</minMaps>
+ <minReduces>5</minReduces>
+ </pool>
+ <user name="sample_user">
+ <maxRunningJobs>6</maxRunningJobs>
+ </user>
+ <userMaxJobsDefault>3</userMaxJobsDefault>
+ </allocations>
+ This example creates a pool sample_pool with a guarantee of 5 map slots
+ and 5 reduce slots. It also limits the number of running jobs per user
+ to 3, except for sample_user, who can run 6 jobs concurrently.
+ Any pool not defined in the allocations file will have no guaranteed
+ capacity. Also, any pool or user with no max running jobs set in the file
+ will be allowed to run an unlimited number of jobs.
+
+mapred.fairscheduler.assignmultiple:
+ Allows the scheduler to assign both a map task and a reduce task on each
+ heartbeat, which improves cluster throughput when there are many small
+ tasks to run. Boolean value, default: false.
+
+mapred.fairscheduler.sizebasedweight:
+ Take into account job sizes in calculating their weights for fair sharing.
+ By default, weights are only based on job priorities. Setting this flag to
+ true will make them based on the size of the job (number of tasks needed)
+ as well, though not linearly (the weight will be proportional to the log
+ of the number of tasks needed). This lets larger jobs get larger fair
+ shares while still providing enough of a share to small jobs to let them
+ finish fast. Boolean value, default: false.
+
+mapred.fairscheduler.poolnameproperty:
+ Specify which jobconf property is used to determine the pool that a job
+ belongs in. String, default: queue.name (the same property as the queue
+ name in the Hadoop Resource Manager, JIRA 3445). You can use user.name
+ or group.name to base it on the Unix user or Unix group for example.
+
+mapred.fairscheduler.weightadjuster:
+ An extensibility point that lets you specify a class to adjust the weights
+ of running jobs. This class should implement the WeightAdjuster interface.
+ There is currently one example implementation - NewJobWeightBooster, which
+ increases the weight of jobs for the first 5 minutes of their lifetime
+ to let short jobs finish faster. To use it, set the weightadjuster property
+ to the full class name, org.apache.hadoop.mapred.NewJobWeightBooster.
+ NewJobWeightBooster itself provides two parameters for setting the duration
+ and boost factor - mapred.newjobweightbooster.factor (default 3) and
+ mapred.newjobweightbooster.duration (in milliseconds, default 300000 for 5
+ minutes).
+
+mapred.fairscheduler.loadmanager:
+ An extensibility point that lets you specify a class that determines
+ how many maps and reduces can run on a given TaskTracker. This class should
+ implement the LoadManager interface. By default the task caps in the Hadoop
+ config file are used, but this option could be used to make the load based
+ on available memory and CPU utilization for example.
+
+mapred.fairscheduler.taskselector:
+ An extensibility point that lets you specify a class that determines
+ which task from within a job to launch on a given tracker. This can be
+ used to change either the locality policy (e.g. keep some jobs within
+ a particular rack) or the speculative execution algorithm (select when to
+ launch speculative tasks). The default implementation uses Hadoop's
+ default algorithms from JobInProgress.
+
+--------------------------------------------------------------------------------
+
+ADMINISTRATION:
+
+The fair scheduler provides support for administration at runtime through
+two mechanisms. First, it is possible to modify pools' allocations and user
+and pool running job limits at runtime by editing the allocation config file.
+The scheduler will reload this file 10-15 seconds after it sees that it was
+modified. Second, current jobs, pools, and fair shares can be examined through
+the JobTracker's web interface, at http://<jobtracker URL>/scheduler. On this
+interface, it is also possible to modify jobs' priorities or move jobs from
+one pool to another and see the effects on the fair shares (this requires
+JavaScript). The following fields can be seen for each job on the web interface:
+
+Submitted - Date and time job was submitted.
+JobID, User, Name - Job identifiers as on the standard web UI.
+Pool - Current pool of job. Select another value to move job to another pool.
+Priority - Current priority. Select another value to change the job's priority.
+Maps/Reduces Finished: Number of tasks finished / total tasks.
+Maps/Reduces Running: Tasks currently running.
+Map/Reduce Fair Share: The average number of task slots that this job should
+ have at any given time according to fair sharing. The actual number of
+ tasks will go up and down depending on how much compute time the job has
+ had, but on average it will get its fair share amount.
+
+In addition, it is possible to turn on an "advanced" view for the web UI, by
+going to http://<jobtracker URL>/scheduler?advanced. This view shows four more
+columns used for calculations internally:
+
+Maps/Reduce Weight: Weight of the job in the fair sharing calculations. This
+ depends on priority and potentially also on job size and job age if the
+ sizebasedweight and NewJobWeightBooster are enabled.
+Map/Reduce Deficit: The job's scheduling deficit in macine-seconds - the amount
+ of resources it should have gotten according to its fair share, minus how
+ many it actually got. Positive deficit means the job will be scheduled
+ again in the near future because it needs to catch up to its fair share.
+ The scheduler schedules jobs with higher deficit ahead of others. Please
+ see the Implementation section of this document for details.
+
+Finally, the web interface provides a button for switching to FIFO scheduling,
+at runtime, at the bottom of the page, in case this becomes necessary and it
+is inconvenient to restart the MapReduce cluster.
+
+--------------------------------------------------------------------------------
+
+IMPLEMENTATION:
+
+There are two aspects to implementing fair scheduling: Calculating each job's
+fair share, and choosing which job to run when a task slot becomes available.
+
+To select jobs to run, the scheduler then keeps track of a "deficit" for
+each job - the difference between the amount of compute time it should have
+gotten on an ideal scheduler, and the amount of compute time it actually got.
+This is a measure of how "unfair" we've been to the job. Every few hundred
+milliseconds, the scheduler updates the deficit of each job by looking at
+how many tasks each job had running during this interval vs. its fair share.
+Whenever a task slot becomes available, it is assigned to the job with the
+highest deficit. There is one exception - if there were one or more jobs who
+were not meeting their pool capacity guarantees, we only choose among these
+"needy" jobs (based again on their deficit), to ensure that the scheduler
+meets pool guarantees as soon as possible.
+
+The fair shares are calculated by dividing the capacity of the cluster among
+runnable jobs according to a "weight" for each job. By default the weight is
+based on priority, with each level of priority having 2x higher weight than the
+next (for example, VERY_HIGH has 4x the weight of NORMAL). However, weights can
+also be based on job sizes and ages, as described in the Configuring section.
+For jobs that are in a pool, fair shares also take into account the minimum
+guarantee for that pool. This capacity is divided among the jobs in that pool
+according again to their weights.
+
+Finally, when limits on a user's running jobs or a pool's running jobs are in
+place, we choose which jobs get to run by sorting all jobs in order of priority
+and then submit time, as in the standard Hadoop scheduler. Any jobs that fall
+after the user/pool's limit in this ordering are queued up and wait idle until
+they can be run. During this time, they are ignored from the fair sharing
+calculations and do not gain or lose deficit (their fair share is set to zero).
Added: hadoop/core/trunk/src/contrib/fairscheduler/build.xml
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/build.xml?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/build.xml (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/build.xml Thu Aug 28 20:50:06 2008
@@ -0,0 +1,28 @@
+<?xml version="1.0"?>
+
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!--
+Before you can run these subtargets directly, you need
+to call at top-level: ant deploy-contrib compile-core-test
+-->
+<project name="fairscheduler" default="jar">
+
+ <import file="../build-contrib.xml"/>
+
+</project>
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/AllocationConfigurationException.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,30 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * Thrown when the allocation file for {@link PoolManager} is malformed.
+ */
+public class AllocationConfigurationException extends Exception {
+ private static final long serialVersionUID = 4046517047810854249L;
+
+ public AllocationConfigurationException(String message) {
+ super(message);
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+/**
+ * A {@link LoadManager} for use by the {@link FairScheduler} that allocates
+ * tasks evenly across nodes up to their per-node maximum, using the default
+ * load management algorithm in Hadoop.
+ */
+public class CapBasedLoadManager extends LoadManager {
+ /**
+ * Determine how many tasks of a given type we want to run on a TaskTracker.
+ * This cap is chosen based on how many tasks of that type are outstanding in
+ * total, so that when the cluster is used below capacity, tasks are spread
+ * out uniformly across the nodes rather than being clumped up on whichever
+ * machines sent out heartbeats earliest.
+ */
+ int getCap(TaskTrackerStatus tracker,
+ int totalRunnableTasks, int localMaxTasks) {
+ int numTaskTrackers = taskTrackerManager.taskTrackers().size();
+ return Math.min(localMaxTasks,
+ (int) Math.ceil((double) totalRunnableTasks / numTaskTrackers));
+ }
+
+ @Override
+ public boolean canAssignMap(TaskTrackerStatus tracker,
+ int totalRunnableMaps) {
+ return tracker.countMapTasks() < getCap(tracker, totalRunnableMaps,
+ tracker.getMaxMapTasks());
+ }
+
+ @Override
+ public boolean canAssignReduce(TaskTrackerStatus tracker,
+ int totalRunnableReduces) {
+ return tracker.countReduceTasks() < getCap(tracker, totalRunnableReduces,
+ tracker.getMaxReduceTasks());
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/DefaultTaskSelector.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * A {@link TaskSelector} implementation that wraps around the default
+ * {@link JobInProgress#obtainNewMapTask(TaskTrackerStatus, int)} and
+ * {@link JobInProgress#obtainNewReduceTask(TaskTrackerStatus, int)} methods
+ * in {@link JobInProgress}, using the default Hadoop locality and speculative
+ * threshold algorithms.
+ */
+public class DefaultTaskSelector extends TaskSelector {
+
+ @Override
+ public int neededSpeculativeMaps(JobInProgress job) {
+ int count = 0;
+ long time = System.currentTimeMillis();
+ double avgProgress = job.getStatus().mapProgress();
+ for (TaskInProgress tip: job.maps) {
+ if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public int neededSpeculativeReduces(JobInProgress job) {
+ int count = 0;
+ long time = System.currentTimeMillis();
+ double avgProgress = job.getStatus().reduceProgress();
+ for (TaskInProgress tip: job.reduces) {
+ if (tip.isRunning() && tip.hasSpeculativeTask(time, avgProgress)) {
+ count++;
+ }
+ }
+ return count;
+ }
+
+ @Override
+ public Task obtainNewMapTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ throws IOException {
+ ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+ return job.obtainNewMapTask(taskTracker, numTaskTrackers,
+ taskTrackerManager.getNumberOfUniqueHosts());
+ }
+
+ @Override
+ public Task obtainNewReduceTask(TaskTrackerStatus taskTracker, JobInProgress job)
+ throws IOException {
+ ClusterStatus clusterStatus = taskTrackerManager.getClusterStatus();
+ int numTaskTrackers = clusterStatus.getTaskTrackers();
+ return job.obtainNewReduceTask(taskTracker, numTaskTrackers,
+ taskTrackerManager.getNumberOfUniqueHosts());
+ }
+
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,689 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Map.Entry;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.util.ReflectionUtils;
+
+/**
+ * A {@link TaskScheduler} that implements fair sharing.
+ */
+public class FairScheduler extends TaskScheduler {
+ /** How often fair shares are re-calculated */
+ public static final long UPDATE_INTERVAL = 500;
+ public static final Log LOG = LogFactory.getLog(
+ "org.apache.hadoop.mapred.FairScheduler");
+
+ protected PoolManager poolMgr;
+
+ protected LoadManager loadMgr;
+ protected TaskSelector taskSelector;
+ protected WeightAdjuster weightAdjuster; // Can be null for no weight adjuster
+ protected Map<JobInProgress, JobInfo> infos = // per-job scheduling variables
+ new HashMap<JobInProgress, JobInfo>();
+ protected long lastUpdateTime; // Time when we last updated infos
+ protected boolean initialized; // Are we initialized?
+ protected boolean running; // Are we running?
+ protected boolean useFifo; // Set if we want to revert to FIFO behavior
+ protected boolean assignMultiple; // Simultaneously assign map and reduce?
+ protected boolean sizeBasedWeight; // Give larger weights to larger jobs
+ private Clock clock;
+ private boolean runBackgroundUpdates; // Can be set to false for testing
+ private EagerTaskInitializationListener eagerInitListener;
+ private JobListener jobListener;
+
+ /**
+ * A class for holding per-job scheduler variables. These always contain the
+ * values of the variables at the last update(), and are used along with a
+ * time delta to update the map and reduce deficits before a new update().
+ */
+ static class JobInfo {
+ boolean runnable = false; // Can the job run given user/pool limits?
+ double mapWeight = 0; // Weight of job in calculation of map share
+ double reduceWeight = 0; // Weight of job in calculation of reduce share
+ long mapDeficit = 0; // Time deficit for maps
+ long reduceDeficit = 0; // Time deficit for reduces
+ int runningMaps = 0; // Maps running at last update
+ int runningReduces = 0; // Reduces running at last update
+ int neededMaps; // Maps needed at last update
+ int neededReduces; // Reduces needed at last update
+ int minMaps = 0; // Minimum maps as guaranteed by pool
+ int minReduces = 0; // Minimum reduces as guaranteed by pool
+ double mapFairShare = 0; // Fair share of map slots at last update
+ double reduceFairShare = 0; // Fair share of reduce slots at last update
+ }
+
+ /**
+ * A clock class - can be mocked out for testing.
+ */
+ static class Clock {
+ long getTime() {
+ return System.currentTimeMillis();
+ }
+ }
+
+ public FairScheduler() {
+ this(new Clock(), true);
+ }
+
+ /**
+ * Constructor used for tests, which can change the clock and disable updates.
+ */
+ protected FairScheduler(Clock clock, boolean runBackgroundUpdates) {
+ this.clock = clock;
+ this.runBackgroundUpdates = runBackgroundUpdates;
+ this.eagerInitListener = new EagerTaskInitializationListener();
+ this.jobListener = new JobListener();
+ }
+
+ @Override
+ public void start() {
+ try {
+ Configuration conf = getConf();
+ eagerInitListener.start();
+ taskTrackerManager.addJobInProgressListener(eagerInitListener);
+ taskTrackerManager.addJobInProgressListener(jobListener);
+ poolMgr = new PoolManager(conf);
+ loadMgr = (LoadManager) ReflectionUtils.newInstance(
+ conf.getClass("mapred.fairscheduler.loadmanager",
+ CapBasedLoadManager.class, LoadManager.class), conf);
+ loadMgr.setTaskTrackerManager(taskTrackerManager);
+ loadMgr.start();
+ taskSelector = (TaskSelector) ReflectionUtils.newInstance(
+ conf.getClass("mapred.fairscheduler.taskselector",
+ DefaultTaskSelector.class, TaskSelector.class), conf);
+ taskSelector.setTaskTrackerManager(taskTrackerManager);
+ taskSelector.start();
+ Class<?> weightAdjClass = conf.getClass(
+ "mapred.fairscheduler.weightadjuster", null);
+ if (weightAdjClass != null) {
+ weightAdjuster = (WeightAdjuster) ReflectionUtils.newInstance(
+ weightAdjClass, conf);
+ }
+ assignMultiple = conf.getBoolean("mapred.fairscheduler.assignmultiple",
+ false);
+ sizeBasedWeight = conf.getBoolean("mapred.fairscheduler.sizebasedweight",
+ false);
+ initialized = true;
+ running = true;
+ lastUpdateTime = clock.getTime();
+ // Start a thread to update deficits every UPDATE_INTERVAL
+ if (runBackgroundUpdates)
+ new UpdateThread().start();
+ // Register servlet with JobTracker's Jetty server
+ if (taskTrackerManager instanceof JobTracker) {
+ JobTracker jobTracker = (JobTracker) taskTrackerManager;
+ StatusHttpServer infoServer = jobTracker.infoServer;
+ infoServer.setAttribute("scheduler", this);
+ infoServer.addServlet("scheduler", "/scheduler",
+ FairSchedulerServlet.class);
+ }
+ } catch (Exception e) {
+ // Can't load one of the managers - crash the JobTracker now while it is
+ // starting up so that the user notices.
+ throw new RuntimeException("Failed to start FairScheduler", e);
+ }
+ LOG.info("Successfully configured FairScheduler");
+ }
+
+ @Override
+ public void terminate() throws IOException {
+ running = false;
+ if (jobListener != null)
+ taskTrackerManager.removeJobInProgressListener(jobListener);
+ if (eagerInitListener != null)
+ taskTrackerManager.removeJobInProgressListener(eagerInitListener);
+ }
+
+ /**
+ * Used to listen for jobs added/removed by our {@link TaskTrackerManager}.
+ */
+ private class JobListener extends JobInProgressListener {
+ @Override
+ public void jobAdded(JobInProgress job) {
+ synchronized (FairScheduler.this) {
+ poolMgr.addJob(job);
+ JobInfo info = new JobInfo();
+ infos.put(job, info);
+ update();
+ }
+ }
+
+ @Override
+ public void jobRemoved(JobInProgress job) {
+ synchronized (FairScheduler.this) {
+ poolMgr.removeJob(job);
+ infos.remove(job);
+ }
+ }
+
+ @Override
+ public void jobUpdated(JobInProgress job) {
+ }
+ }
+
+ /**
+ * A thread which calls {@link FairScheduler#update()} ever
+ * <code>UPDATE_INTERVAL</code> milliseconds.
+ */
+ private class UpdateThread extends Thread {
+ private UpdateThread() {
+ super("FairScheduler update thread");
+ }
+
+ public void run() {
+ while (running) {
+ try {
+ Thread.sleep(UPDATE_INTERVAL);
+ update();
+ } catch (Exception e) {
+ LOG.error("Failed to update fair share calculations", e);
+ }
+ }
+ }
+ }
+
+ @Override
+ public synchronized List<Task> assignTasks(TaskTrackerStatus tracker)
+ throws IOException {
+ if (!initialized) // Don't try to assign tasks if we haven't yet started up
+ return null;
+
+ // Reload allocations file if it hasn't been loaded in a while
+ poolMgr.reloadAllocsIfNecessary();
+
+ // Compute total runnable maps and reduces
+ int runnableMaps = 0;
+ int runnableReduces = 0;
+ for (JobInProgress job: infos.keySet()) {
+ runnableMaps += runnableTasks(job, TaskType.MAP);
+ runnableReduces += runnableTasks(job, TaskType.REDUCE);
+ }
+
+ // Scan to see whether any job needs to run a map, then a reduce
+ ArrayList<Task> tasks = new ArrayList<Task>();
+ TaskType[] types = new TaskType[] {TaskType.MAP, TaskType.REDUCE};
+ for (TaskType taskType: types) {
+ boolean canAssign = (taskType == TaskType.MAP) ?
+ loadMgr.canAssignMap(tracker, runnableMaps) :
+ loadMgr.canAssignReduce(tracker, runnableReduces);
+ if (canAssign) {
+ // Figure out the jobs that need this type of task
+ List<JobInProgress> candidates = new ArrayList<JobInProgress>();
+ for (JobInProgress job: infos.keySet()) {
+ if (job.getStatus().getRunState() == JobStatus.RUNNING &&
+ neededTasks(job, taskType) > 0) {
+ candidates.add(job);
+ }
+ }
+ // Sort jobs by deficit (for Fair Sharing) or submit time (for FIFO)
+ Comparator<JobInProgress> comparator = useFifo ?
+ new FifoJobComparator() : new DeficitComparator(taskType);
+ Collections.sort(candidates, comparator);
+ for (JobInProgress job: candidates) {
+ Task task = (taskType == TaskType.MAP ?
+ taskSelector.obtainNewMapTask(tracker, job) :
+ taskSelector.obtainNewReduceTask(tracker, job));
+ if (task != null) {
+ // Update the JobInfo for this job so we account for the launched
+ // tasks during this update interval and don't try to launch more
+ // tasks than the job needed on future heartbeats
+ JobInfo info = infos.get(job);
+ if (taskType == TaskType.MAP) {
+ info.runningMaps++;
+ info.neededMaps--;
+ } else {
+ info.runningReduces++;
+ info.neededReduces--;
+ }
+ tasks.add(task);
+ if (!assignMultiple)
+ return tasks;
+ break;
+ }
+ }
+ }
+ }
+
+ // If no tasks were found, return null
+ return tasks.isEmpty() ? null : tasks;
+ }
+
+ /**
+ * Compare jobs by deficit for a given task type, putting jobs whose current
+ * allocation is less than their minimum share always ahead of others. This is
+ * the default job comparator used for Fair Sharing.
+ */
+ private class DeficitComparator implements Comparator<JobInProgress> {
+ private final TaskType taskType;
+
+ private DeficitComparator(TaskType taskType) {
+ this.taskType = taskType;
+ }
+
+ public int compare(JobInProgress j1, JobInProgress j2) {
+ // Put needy jobs ahead of non-needy jobs (where needy means must receive
+ // new tasks to meet slot minimum), comparing among jobs of the same type
+ // by deficit so as to put jobs with higher deficit ahead.
+ JobInfo j1Info = infos.get(j1);
+ JobInfo j2Info = infos.get(j2);
+ long deficitDif;
+ boolean j1Needy, j2Needy;
+ if (taskType == TaskType.MAP) {
+ j1Needy = j1.runningMaps() < Math.floor(j1Info.minMaps);
+ j2Needy = j2.runningMaps() < Math.floor(j2Info.minMaps);
+ deficitDif = j2Info.mapDeficit - j1Info.mapDeficit;
+ } else {
+ j1Needy = j1.runningReduces() < Math.floor(j1Info.minReduces);
+ j2Needy = j2.runningReduces() < Math.floor(j2Info.minReduces);
+ deficitDif = j2Info.reduceDeficit - j1Info.reduceDeficit;
+ }
+ if (j1Needy && !j2Needy)
+ return -1;
+ else if (j2Needy && !j1Needy)
+ return 1;
+ else // Both needy or both non-needy; compare by deficit
+ return (int) Math.signum(deficitDif);
+ }
+ }
+
+ /**
+ * Recompute the internal variables used by the scheduler - per-job weights,
+ * fair shares, deficits, minimum slot allocations, and numbers of running
+ * and needed tasks of each type.
+ */
+ protected synchronized void update() {
+ // Remove non-running jobs
+ List<JobInProgress> toRemove = new ArrayList<JobInProgress>();
+ for (JobInProgress job: infos.keySet()) {
+ int runState = job.getStatus().getRunState();
+ if (runState == JobStatus.SUCCEEDED || runState == JobStatus.FAILED) {
+ toRemove.add(job);
+ }
+ }
+ for (JobInProgress job: toRemove) {
+ infos.remove(job);
+ poolMgr.removeJob(job);
+ }
+ // Update running jobs with deficits since last update, and compute new
+ // slot allocations, weight, shares and task counts
+ long now = clock.getTime();
+ long timeDelta = now - lastUpdateTime;
+ updateDeficits(timeDelta);
+ updateRunnability();
+ updateTaskCounts();
+ updateWeights();
+ updateMinSlots();
+ updateFairShares();
+ lastUpdateTime = now;
+ }
+
+ private void updateDeficits(long timeDelta) {
+ for (JobInfo info: infos.values()) {
+ info.mapDeficit +=
+ (info.mapFairShare - info.runningMaps) * timeDelta;
+ info.reduceDeficit +=
+ (info.reduceFairShare - info.runningReduces) * timeDelta;
+ }
+ }
+
+ private void updateRunnability() {
+ // Start by marking everything as not runnable
+ for (JobInfo info: infos.values()) {
+ info.runnable = false;
+ }
+ // Create a list of sorted jobs in order of start time and priority
+ List<JobInProgress> jobs = new ArrayList<JobInProgress>(infos.keySet());
+ Collections.sort(jobs, new FifoJobComparator());
+ // Mark jobs as runnable in order of start time and priority, until
+ // user or pool limits have been reached.
+ Map<String, Integer> userJobs = new HashMap<String, Integer>();
+ Map<String, Integer> poolJobs = new HashMap<String, Integer>();
+ for (JobInProgress job: jobs) {
+ if (job.getStatus().getRunState() == JobStatus.RUNNING) {
+ String user = job.getJobConf().getUser();
+ String pool = poolMgr.getPoolName(job);
+ int userCount = userJobs.containsKey(user) ? userJobs.get(user) : 0;
+ int poolCount = poolJobs.containsKey(pool) ? poolJobs.get(pool) : 0;
+ if (userCount < poolMgr.getUserMaxJobs(user) &&
+ poolCount < poolMgr.getPoolMaxJobs(pool)) {
+ infos.get(job).runnable = true;
+ userJobs.put(user, userCount + 1);
+ poolJobs.put(pool, poolCount + 1);
+ }
+ }
+ }
+ }
+
+ private void updateTaskCounts() {
+ for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+ JobInProgress job = entry.getKey();
+ JobInfo info = entry.getValue();
+ if (job.getStatus().getRunState() != JobStatus.RUNNING)
+ continue; // Job is still in PREP state and tasks aren't initialized
+ // Count maps
+ int totalMaps = job.numMapTasks;
+ int finishedMaps = 0;
+ int runningMaps = 0;
+ for (TaskInProgress tip: job.getMapTasks()) {
+ if (tip.isComplete()) {
+ finishedMaps += 1;
+ } else if (tip.isRunning()) {
+ runningMaps += tip.getActiveTasks().size();
+ }
+ }
+ info.runningMaps = runningMaps;
+ info.neededMaps = (totalMaps - runningMaps - finishedMaps
+ + taskSelector.neededSpeculativeMaps(job));
+ // Count reduces
+ int totalReduces = job.numReduceTasks;
+ int finishedReduces = 0;
+ int runningReduces = 0;
+ for (TaskInProgress tip: job.getReduceTasks()) {
+ if (tip.isComplete()) {
+ finishedReduces += 1;
+ } else if (tip.isRunning()) {
+ runningReduces += tip.getActiveTasks().size();
+ }
+ }
+ info.runningReduces = runningReduces;
+ info.neededReduces = (totalReduces - runningReduces - finishedReduces
+ + taskSelector.neededSpeculativeReduces(job));
+ // If the job was marked as not runnable due to its user or pool having
+ // too many active jobs, set the neededMaps/neededReduces to 0. We still
+ // count runningMaps/runningReduces however so we can give it a deficit.
+ if (!info.runnable) {
+ info.neededMaps = 0;
+ info.neededReduces = 0;
+ }
+ }
+ }
+
+ private void updateWeights() {
+ for (Map.Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+ JobInProgress job = entry.getKey();
+ JobInfo info = entry.getValue();
+ info.mapWeight = calculateWeight(job, TaskType.MAP);
+ info.reduceWeight = calculateWeight(job, TaskType.REDUCE);
+ }
+ }
+
+ private void updateMinSlots() {
+ // Clear old minSlots
+ for (JobInfo info: infos.values()) {
+ info.minMaps = 0;
+ info.minReduces = 0;
+ }
+ // For each pool, distribute its task allocation among jobs in it that need
+ // slots. This is a little tricky since some jobs in the pool might not be
+ // able to use all the slots, e.g. they might have only a few tasks left.
+ // To deal with this, we repeatedly split up the available task slots
+ // between the jobs left, give each job min(its alloc, # of slots it needs),
+ // and redistribute any slots that are left over between jobs that still
+ // need slots on the next pass. If, in total, the jobs in our pool don't
+ // need all its allocation, we leave the leftover slots for general use.
+ PoolManager poolMgr = getPoolManager();
+ for (Pool pool: poolMgr.getPools()) {
+ for (final TaskType type: TaskType.values()) {
+ Set<JobInProgress> jobs = new HashSet<JobInProgress>(pool.getJobs());
+ int slotsLeft = poolMgr.getAllocation(pool.getName(), type);
+ // Keep assigning slots until none are left
+ while (slotsLeft > 0) {
+ // Figure out total weight of jobs that still need slots
+ double totalWeight = 0;
+ for (Iterator<JobInProgress> it = jobs.iterator(); it.hasNext();) {
+ JobInProgress job = it.next();
+ if (isRunnable(job) &&
+ runnableTasks(job, type) > minTasks(job, type)) {
+ totalWeight += weight(job, type);
+ } else {
+ it.remove();
+ }
+ }
+ if (totalWeight == 0) // No jobs that can use more slots are left
+ break;
+ // Assign slots to jobs, using the floor of their weight divided by
+ // total weight. This ensures that all jobs get some chance to take
+ // a slot. Then, if no slots were assigned this way, we do another
+ // pass where we use ceil, in case some slots were still left over.
+ int oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+ for (JobInProgress job: jobs) {
+ double weight = weight(job, type);
+ int share = (int) Math.floor(oldSlots * weight / totalWeight);
+ slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+ }
+ if (slotsLeft == oldSlots) {
+ // No tasks were assigned; do another pass using ceil, giving the
+ // extra slots to jobs in order of weight then deficit
+ List<JobInProgress> sortedJobs = new ArrayList<JobInProgress>(jobs);
+ Collections.sort(sortedJobs, new Comparator<JobInProgress>() {
+ public int compare(JobInProgress j1, JobInProgress j2) {
+ double dif = weight(j2, type) - weight(j1, type);
+ if (dif == 0) // Weights are equal, compare by deficit
+ dif = deficit(j2, type) - deficit(j1, type);
+ return (int) Math.signum(dif);
+ }
+ });
+ for (JobInProgress job: sortedJobs) {
+ double weight = weight(job, type);
+ int share = (int) Math.ceil(oldSlots * weight / totalWeight);
+ slotsLeft = giveMinSlots(job, type, slotsLeft, share);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Give up to <code>tasksToGive</code> min slots to a job (potentially fewer
+ * if either the job needs fewer slots or there aren't enough slots left).
+ * Returns the number of slots left over.
+ */
+ private int giveMinSlots(JobInProgress job, TaskType type,
+ int slotsLeft, int slotsToGive) {
+ int runnable = runnableTasks(job, type);
+ int curMin = minTasks(job, type);
+ slotsToGive = Math.min(Math.min(slotsLeft, runnable - curMin), slotsToGive);
+ slotsLeft -= slotsToGive;
+ JobInfo info = infos.get(job);
+ if (type == TaskType.MAP)
+ info.minMaps += slotsToGive;
+ else
+ info.minReduces += slotsToGive;
+ return slotsLeft;
+ }
+
+ private void updateFairShares() {
+ // Clear old fairShares
+ for (JobInfo info: infos.values()) {
+ info.mapFairShare = 0;
+ info.reduceFairShare = 0;
+ }
+ // Assign new shares, based on weight and minimum share. This is done
+ // as follows. First, we split up the available slots between all
+ // jobs according to weight. Then if there are any jobs whose minSlots is
+ // larger than their fair allocation, we give them their minSlots and
+ // remove them from the list, and start again with the amount of slots
+ // left over. This continues until all jobs' minSlots are less than their
+ // fair allocation, and at this point we know that we've met everyone's
+ // guarantee and we've split the excess capacity fairly among jobs left.
+ for (TaskType type: TaskType.values()) {
+ // Select only jobs that still need this type of task
+ HashSet<JobInfo> jobsLeft = new HashSet<JobInfo>();
+ for (Entry<JobInProgress, JobInfo> entry: infos.entrySet()) {
+ JobInProgress job = entry.getKey();
+ JobInfo info = entry.getValue();
+ if (isRunnable(job) && runnableTasks(job, type) > 0) {
+ jobsLeft.add(info);
+ }
+ }
+ double slotsLeft = getTotalSlots(type);
+ while (!jobsLeft.isEmpty()) {
+ double totalWeight = 0;
+ for (JobInfo info: jobsLeft) {
+ double weight = (type == TaskType.MAP ?
+ info.mapWeight : info.reduceWeight);
+ totalWeight += weight;
+ }
+ boolean recomputeSlots = false;
+ double oldSlots = slotsLeft; // Copy slotsLeft so we can modify it
+ for (Iterator<JobInfo> iter = jobsLeft.iterator(); iter.hasNext();) {
+ JobInfo info = iter.next();
+ double minSlots = (type == TaskType.MAP ?
+ info.minMaps : info.minReduces);
+ double weight = (type == TaskType.MAP ?
+ info.mapWeight : info.reduceWeight);
+ double fairShare = weight / totalWeight * oldSlots;
+ if (minSlots > fairShare) {
+ // Job needs more slots than its fair share; give it its minSlots,
+ // remove it from the list, and set recomputeSlots = true to
+ // remember that we must loop again to redistribute unassigned slots
+ if (type == TaskType.MAP)
+ info.mapFairShare = minSlots;
+ else
+ info.reduceFairShare = minSlots;
+ slotsLeft -= minSlots;
+ iter.remove();
+ recomputeSlots = true;
+ }
+ }
+ if (!recomputeSlots) {
+ // All minimums are met. Give each job its fair share of excess slots.
+ for (JobInfo info: jobsLeft) {
+ double weight = (type == TaskType.MAP ?
+ info.mapWeight : info.reduceWeight);
+ double fairShare = weight / totalWeight * oldSlots;
+ if (type == TaskType.MAP)
+ info.mapFairShare = fairShare;
+ else
+ info.reduceFairShare = fairShare;
+ }
+ break;
+ }
+ }
+ }
+ }
+
+ private double calculateWeight(JobInProgress job, TaskType taskType) {
+ if (!isRunnable(job)) {
+ return 0;
+ } else {
+ double weight = 1.0;
+ if (sizeBasedWeight) {
+ // Set weight based on runnable tasks
+ weight = Math.log1p(runnableTasks(job, taskType)) / Math.log(2);
+ }
+ weight *= getPriorityFactor(job.getPriority());
+ if (weightAdjuster != null) {
+ // Run weight through the user-supplied weightAdjuster
+ weight = weightAdjuster.adjustWeight(job, taskType, weight);
+ }
+ return weight;
+ }
+ }
+
+ private double getPriorityFactor(JobPriority priority) {
+ switch (priority) {
+ case VERY_HIGH: return 4.0;
+ case HIGH: return 2.0;
+ case NORMAL: return 1.0;
+ case LOW: return 0.5;
+ default: return 0.25; // priority = VERY_LOW
+ }
+ }
+
+ public PoolManager getPoolManager() {
+ return poolMgr;
+ }
+
+ public int getTotalSlots(TaskType type) {
+ int slots = 0;
+ for (TaskTrackerStatus tt: taskTrackerManager.taskTrackers()) {
+ slots += (type == TaskType.MAP ?
+ tt.getMaxMapTasks() : tt.getMaxReduceTasks());
+ }
+ return slots;
+ }
+
+ public boolean getUseFifo() {
+ return useFifo;
+ }
+
+ public void setUseFifo(boolean useFifo) {
+ this.useFifo = useFifo;
+ }
+
+ // Getter methods for reading JobInfo values based on TaskType, safely
+ // returning 0's for jobs with no JobInfo present.
+
+ protected int neededTasks(JobInProgress job, TaskType taskType) {
+ JobInfo info = infos.get(job);
+ if (info == null) return 0;
+ return taskType == TaskType.MAP ? info.neededMaps : info.neededReduces;
+ }
+
+ protected int runningTasks(JobInProgress job, TaskType taskType) {
+ JobInfo info = infos.get(job);
+ if (info == null) return 0;
+ return taskType == TaskType.MAP ? info.runningMaps : info.runningReduces;
+ }
+
+ protected int runnableTasks(JobInProgress job, TaskType type) {
+ return neededTasks(job, type) + runningTasks(job, type);
+ }
+
+ protected int minTasks(JobInProgress job, TaskType type) {
+ JobInfo info = infos.get(job);
+ if (info == null) return 0;
+ return (type == TaskType.MAP) ? info.minMaps : info.minReduces;
+ }
+
+ protected double weight(JobInProgress job, TaskType taskType) {
+ JobInfo info = infos.get(job);
+ if (info == null) return 0;
+ return (taskType == TaskType.MAP ? info.mapWeight : info.reduceWeight);
+ }
+
+ protected double deficit(JobInProgress job, TaskType taskType) {
+ JobInfo info = infos.get(job);
+ if (info == null) return 0;
+ return taskType == TaskType.MAP ? info.mapDeficit : info.reduceDeficit;
+ }
+
+ protected boolean isRunnable(JobInProgress job) {
+ JobInfo info = infos.get(job);
+ if (info == null) return false;
+ return info.runnable;
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,299 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.text.DateFormat;
+import java.text.SimpleDateFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Comparator;
+import java.util.Date;
+import java.util.List;
+
+import javax.servlet.ServletContext;
+import javax.servlet.ServletException;
+import javax.servlet.http.HttpServlet;
+import javax.servlet.http.HttpServletRequest;
+import javax.servlet.http.HttpServletResponse;
+
+import org.apache.hadoop.mapred.FairScheduler.JobInfo;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ * Servlet for displaying fair scheduler information, installed at
+ * [job tracker URL]/scheduler when the {@link FairScheduler} is in use.
+ *
+ * The main features are viewing each job's task count and fair share, ability
+ * to change job priorities and pools from the UI, and ability to switch the
+ * scheduler to FIFO mode without restarting the JobTracker if this is required
+ * for any reason.
+ *
+ * There is also an "advanced" view for debugging that can be turned on by
+ * going to [job tracker URL]/scheduler?advanced.
+ */
+public class FairSchedulerServlet extends HttpServlet {
+ private static final long serialVersionUID = 9104070533067306659L;
+ private static final DateFormat DATE_FORMAT =
+ new SimpleDateFormat("MMM dd, HH:mm");
+
+ private FairScheduler scheduler;
+ private JobTracker jobTracker;
+ private static long lastId = 0; // Used to generate unique element IDs
+
+ @Override
+ public void init() throws ServletException {
+ super.init();
+ ServletContext servletContext = this.getServletContext();
+ this.scheduler = (FairScheduler) servletContext.getAttribute("scheduler");
+ this.jobTracker = (JobTracker) scheduler.taskTrackerManager;
+ }
+
+ @Override
+ protected void doPost(HttpServletRequest req, HttpServletResponse resp)
+ throws ServletException, IOException {
+ doGet(req, resp); // Same handler for both GET and POST
+ }
+
+ @Override
+ public void doGet(HttpServletRequest request, HttpServletResponse response)
+ throws ServletException, IOException {
+ // If the request has a set* param, handle that and redirect to the regular
+ // view page so that the user won't resubmit the data if they hit refresh.
+ boolean advancedView = request.getParameter("advanced") != null;
+ if (request.getParameter("setFifo") != null) {
+ scheduler.setUseFifo(request.getParameter("setFifo").equals("true"));
+ response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+ return;
+ }
+ if (request.getParameter("setPool") != null) {
+ PoolManager poolMgr = scheduler.getPoolManager();
+ synchronized (poolMgr) {
+ String pool = request.getParameter("setPool");
+ String jobId = request.getParameter("jobid");
+ Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+ for (JobInProgress job: runningJobs) {
+ if (job.getProfile().getJobID().toString().equals(jobId)) {
+ poolMgr.setPool(job, pool);
+ scheduler.update();
+ break;
+ }
+ }
+ }
+ response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+ return;
+ }
+ if (request.getParameter("setPriority") != null) {
+ PoolManager poolMgr = scheduler.getPoolManager();
+ synchronized (poolMgr) {
+ JobPriority priority = JobPriority.valueOf(request.getParameter(
+ "setPriority"));
+ String jobId = request.getParameter("jobid");
+ Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+ for (JobInProgress job: runningJobs) {
+ if (job.getProfile().getJobID().toString().equals(jobId)) {
+ job.setPriority(priority);
+ scheduler.update();
+ break;
+ }
+ }
+ }
+ response.sendRedirect("/scheduler" + (advancedView ? "?advanced" : ""));
+ return;
+ }
+ // Print out the normal response
+ response.setContentType("text/html");
+ PrintWriter out = new PrintWriter(response.getOutputStream());
+ String hostname = StringUtils.simpleHostname(
+ jobTracker.getJobTrackerMachine());
+ out.print("<html><head>");
+ out.printf("<title>%s Job Scheduler Admininstration</title>\n", hostname);
+ out.printf("<META http-equiv=\"refresh\" content=\"15;URL=/scheduler%s\">",
+ advancedView ? "?advanced" : "");
+ out.print("<link rel=\"stylesheet\" type=\"text/css\" " +
+ "href=\"/static/hadoop.css\">\n");
+ out.print("</head><body>\n");
+ out.printf("<h1><a href=\"/jobtracker.jsp\">%s</a> " +
+ "Job Scheduler Administration</h1>\n", hostname);
+ showPools(out, advancedView);
+ showJobs(out, advancedView);
+ showAdminForm(out, advancedView);
+ out.print("</body></html>\n");
+ out.close();
+ }
+
+ /**
+ * Print a view of pools to the given output writer.
+ */
+ private void showPools(PrintWriter out, boolean advancedView) {
+ PoolManager poolManager = scheduler.getPoolManager();
+ synchronized(poolManager) {
+ out.print("<h2>Pools</h2>\n");
+ out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+ out.print("<tr><th>Pool</th><th>Running Jobs</th>" +
+ "<th>Min Maps</th><th>Min Reduces</th>" +
+ "<th>Running Maps</th><th>Running Reduces</th></tr>\n");
+ List<Pool> pools = new ArrayList<Pool>(poolManager.getPools());
+ Collections.sort(pools, new Comparator<Pool>() {
+ public int compare(Pool p1, Pool p2) {
+ if (p1.isDefaultPool())
+ return 1;
+ else if (p2.isDefaultPool())
+ return -1;
+ else return p1.getName().compareTo(p2.getName());
+ }});
+ for (Pool pool: pools) {
+ int runningMaps = 0;
+ int runningReduces = 0;
+ for (JobInProgress job: pool.getJobs()) {
+ JobInfo info = scheduler.infos.get(job);
+ if (info != null) {
+ runningMaps += info.runningMaps;
+ runningReduces += info.runningReduces;
+ }
+ }
+ out.print("<tr>\n");
+ out.printf("<td>%s</td>\n", pool.getName());
+ out.printf("<td>%s</td>\n", pool.getJobs().size());
+ out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ TaskType.MAP));
+ out.printf("<td>%s</td>\n", poolManager.getAllocation(pool.getName(),
+ TaskType.REDUCE));
+ out.printf("<td>%s</td>\n", runningMaps);
+ out.printf("<td>%s</td>\n", runningReduces);
+ out.print("</tr>\n");
+ }
+ out.print("</table>\n");
+ }
+ }
+
+ /**
+ * Print a view of running jobs to the given output writer.
+ */
+ private void showJobs(PrintWriter out, boolean advancedView) {
+ out.print("<h2>Running Jobs</h2>\n");
+ out.print("<table border=\"2\" cellpadding=\"5\" cellspacing=\"2\">\n");
+ int colsPerTaskType = advancedView ? 5 : 3;
+ out.printf("<tr><th rowspan=2>Submitted</th>" +
+ "<th rowspan=2>JobID</th>" +
+ "<th rowspan=2>User</th>" +
+ "<th rowspan=2>Name</th>" +
+ "<th rowspan=2>Pool</th>" +
+ "<th rowspan=2>Priority</th>" +
+ "<th colspan=%d>Maps</th>" +
+ "<th colspan=%d>Reduces</th>",
+ colsPerTaskType, colsPerTaskType);
+ out.print("</tr><tr>\n");
+ out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+ (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+ out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
+ (advancedView ? "<th>Weight</th><th>Deficit</th>" : ""));
+ out.print("</tr>\n");
+ Collection<JobInProgress> runningJobs = jobTracker.runningJobs();
+ for (JobInProgress job: runningJobs) {
+ JobProfile profile = job.getProfile();
+ JobInfo info = scheduler.infos.get(job);
+ if (info == null) { // Job finished, but let's show 0's for info
+ info = new JobInfo();
+ }
+ out.print("<tr>\n");
+ out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+ new Date(job.getStartTime())));
+ out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+ profile.getJobID(), profile.getJobID());
+ out.printf("<td>%s</td>\n", profile.getUser());
+ out.printf("<td>%s</td>\n", profile.getJobName());
+ out.printf("<td>%s</td>\n", generateSelect(
+ scheduler.getPoolManager().getPoolNames(),
+ scheduler.getPoolManager().getPoolName(job),
+ "/scheduler?setPool=<CHOICE>&jobid=" + profile.getJobID() +
+ (advancedView ? "&advanced" : "")));
+ out.printf("<td>%s</td>\n", generateSelect(
+ Arrays.asList(new String[]
+ {"VERY_LOW", "LOW", "NORMAL", "HIGH", "VERY_HIGH"}),
+ job.getPriority().toString(),
+ "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID() +
+ (advancedView ? "&advanced" : "")));
+ out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+ job.finishedMaps(), job.desiredMaps(), info.runningMaps,
+ info.mapFairShare);
+ if (advancedView) {
+ out.printf("<td>%8.1f</td>\n", info.mapWeight);
+ out.printf("<td>%s</td>\n", info.neededMaps > 0 ?
+ (info.mapDeficit / 1000) + "s" : "--");
+ }
+ out.printf("<td>%d / %d</td><td>%d</td><td>%8.1f</td>\n",
+ job.finishedReduces(), job.desiredReduces(), info.runningReduces,
+ info.reduceFairShare);
+ if (advancedView) {
+ out.printf("<td>%8.1f</td>\n", info.reduceWeight);
+ out.printf("<td>%s</td>\n", info.neededReduces > 0 ?
+ (info.reduceDeficit / 1000) + "s" : "--");
+ }
+ out.print("</tr>\n");
+ }
+ out.print("</table>\n");
+ }
+
+ /**
+ * Generate a HTML select control with a given list of choices and a given
+ * option selected. When the selection is changed, take the user to the
+ * <code>submitUrl</code>. The <code>submitUrl</code> can be made to include
+ * the option selected -- the first occurrence of the substring
+ * <code><CHOICE></code> will be replaced by the option chosen.
+ */
+ private String generateSelect(Iterable<String> choices,
+ String selectedChoice, String submitUrl) {
+ StringBuilder html = new StringBuilder();
+ String id = "select" + lastId++;
+ html.append("<select id=\"" + id + "\" name=\"" + id + "\" " +
+ "onchange=\"window.location = '" + submitUrl +
+ "'.replace('<CHOICE>', document.getElementById('" + id +
+ "').value);\">\n");
+ for (String choice: choices) {
+ html.append(String.format("<option value=\"%s\"%s>%s</option>\n",
+ choice, (choice.equals(selectedChoice) ? " selected" : ""), choice));
+ }
+ html.append("</select>\n");
+ return html.toString();
+ }
+
+ /**
+ * Print the administration form at the bottom of the page, which currently
+ * only includes the button for switching between FIFO and Fair Scheduling.
+ */
+ private void showAdminForm(PrintWriter out, boolean advancedView) {
+ out.print("<h2>Scheduling Mode</h2>\n");
+ String curMode = scheduler.getUseFifo() ? "FIFO" : "Fair Sharing";
+ String otherMode = scheduler.getUseFifo() ? "Fair Sharing" : "FIFO";
+ String advParam = advancedView ? "?advanced" : "";
+ out.printf("<form method=\"post\" action=\"/scheduler%s\">\n", advParam);
+ out.printf("<p>The scheduler is currently using <b>%s mode</b>. " +
+ "<input type=\"submit\" value=\"Switch to %s mode.\" " +
+ "onclick=\"return confirm('Are you sure you want to change " +
+ "scheduling mode to %s?')\" />\n",
+ curMode, otherMode, otherMode);
+ out.printf("<input type=\"hidden\" name=\"setFifo\" value=\"%s\" />",
+ !scheduler.getUseFifo());
+ out.print("</form>\n");
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FifoJobComparator.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,42 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.Comparator;
+
+/**
+ * Order {@link JobInProgress} objects by priority and then by submit time, as
+ * in the default scheduler in Hadoop.
+ */
+public class FifoJobComparator implements Comparator<JobInProgress> {
+ public int compare(JobInProgress j1, JobInProgress j2) {
+ int res = j1.getPriority().compareTo(j2.getPriority());
+ if (res == 0) {
+ if (j1.getStartTime() < j2.getStartTime()) {
+ res = -1;
+ } else {
+ res = (j1.getStartTime() == j2.getStartTime() ? 0 : 1);
+ }
+ }
+ if (res == 0) {
+ res = j1.hashCode() - j2.hashCode();
+ }
+ return res;
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+
+/**
+ * A pluggable object that manages the load on each {@link TaskTracker}, telling
+ * the {@link TaskScheduler} when it can launch new tasks.
+ */
+public abstract class LoadManager implements Configurable {
+ protected Configuration conf;
+ protected TaskTrackerManager taskTrackerManager;
+
+ public Configuration getConf() {
+ return conf;
+ }
+
+ public void setConf(Configuration conf) {
+ this.conf = conf;
+ }
+
+ public synchronized void setTaskTrackerManager(
+ TaskTrackerManager taskTrackerManager) {
+ this.taskTrackerManager = taskTrackerManager;
+ }
+
+ /**
+ * Lifecycle method to allow the LoadManager to start any work in separate
+ * threads.
+ */
+ public void start() throws IOException {
+ // do nothing
+ }
+
+ /**
+ * Lifecycle method to allow the LoadManager to stop any work it is doing.
+ */
+ public void terminate() throws IOException {
+ // do nothing
+ }
+
+ /**
+ * Can a given {@link TaskTracker} run another map task?
+ * @param tracker The machine we wish to run a new map on
+ * @param totalRunnableMaps Set of running jobs in the cluster
+ * @return true if another map can be launched on <code>tracker</code>
+ */
+ public abstract boolean canAssignMap(TaskTrackerStatus tracker,
+ int totalRunnableMaps);
+
+ /**
+ * Can a given {@link TaskTracker} run another reduce task?
+ * @param tracker The machine we wish to run a new map on
+ * @param totalReducesNeeded Set of running jobs in the cluster
+ * @return true if another reduce can be launched on <code>tracker</code>
+ */
+ public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
+ int totalRunnableReduces);
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/NewJobWeightBooster.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
+ * for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
+ * This can be used to make shorter jobs finish faster, emulating Shortest Job
+ * First scheduling while not starving long jobs.
+ */
+public class NewJobWeightBooster extends Configured implements WeightAdjuster {
+ private static final float DEFAULT_FACTOR = 3;
+ private static final long DEFAULT_DURATION = 5 * 60 * 1000;
+
+ private float factor;
+ private long duration;
+
+ public void setConf(Configuration conf) {
+ if (conf != null) {
+ factor = conf.getFloat("mapred.newjobweightbooster.factor",
+ DEFAULT_FACTOR);
+ duration = conf.getLong("mapred.newjobweightbooster.duration",
+ DEFAULT_DURATION);
+ }
+ super.setConf(conf);
+ }
+
+ public double adjustWeight(JobInProgress job, TaskType taskType,
+ double curWeight) {
+ long start = job.getStartTime();
+ long now = System.currentTimeMillis();
+ if (now - start < duration) {
+ return curWeight * factor;
+ } else {
+ return curWeight;
+ }
+ }
+}
Added: hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java
URL: http://svn.apache.org/viewvc/hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java?rev=690093&view=auto
==============================================================================
--- hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java (added)
+++ hadoop/core/trunk/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/Pool.java Thu Aug 28 20:50:06 2008
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.util.ArrayList;
+import java.util.Collection;
+
+/**
+ * A schedulable pool of jobs.
+ */
+public class Pool {
+ /** Name of the default pool, where jobs with no pool parameter go. */
+ public static final String DEFAULT_POOL_NAME = "default";
+
+ /** Pool name. */
+ private String name;
+
+ /** Jobs in this specific pool; does not include children pools' jobs. */
+ private Collection<JobInProgress> jobs = new ArrayList<JobInProgress>();
+
+ public Pool(String name) {
+ this.name = name;
+ }
+
+ public Collection<JobInProgress> getJobs() {
+ return jobs;
+ }
+
+ public void addJob(JobInProgress job) {
+ jobs.add(job);
+ }
+
+ public void removeJob(JobInProgress job) {
+ jobs.remove(job);
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public boolean isDefaultPool() {
+ return Pool.DEFAULT_POOL_NAME.equals(name);
+ }
+}