You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by st...@apache.org on 2009/11/28 21:26:22 UTC
svn commit: r885145 [7/34] - in /hadoop/mapreduce/branches/MAPREDUCE-233: ./
.eclipse.templates/ .eclipse.templates/.launches/ conf/ ivy/ lib/
src/benchmarks/gridmix/ src/benchmarks/gridmix/pipesort/
src/benchmarks/gridmix2/ src/benchmarks/gridmix2/src...
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerConf.java Sat Nov 28 20:26:01 2009
@@ -22,8 +22,7 @@
import java.io.File;
import java.io.FileWriter;
import java.io.PrintWriter;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.Properties;
import junit.framework.TestCase;
@@ -34,7 +33,7 @@
private static String testDataDir = System.getProperty("test.build.data");
private static String testConfFile;
- private Map<String, String> defaultProperties;
+ //private Map<String, String> defaultProperties;
private CapacitySchedulerConf testConf;
private PrintWriter writer;
@@ -48,16 +47,6 @@
}
public TestCapacitySchedulerConf() {
- defaultProperties = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent",
- "maximum-initialized-jobs-per-user"},
- new String[] { "100",
- "false",
- "100",
- "2" }
- );
}
@@ -71,213 +60,8 @@
confFile.delete();
}
}
-
- public void testDefaults() {
- testConf = new CapacitySchedulerConf();
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String,String>>();
- queueDetails.put("default", defaultProperties);
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testQueues() {
-
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent",
- "maximum-initialized-jobs-per-user"},
- new String[] { "10",
- "true",
- "25",
- "4"}
- );
-
- Map<String, String> q2Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent",
- "maximum-initialized-jobs-per-user"},
- new String[] { "100",
- "false",
- "50",
- "1"}
- );
-
- startConfig();
- writeQueueDetails("default", q1Props);
- writeQueueDetails("research", q2Props);
- endConfig();
-
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String,String>>();
- queueDetails.put("default", q1Props);
- queueDetails.put("research", q2Props);
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testQueueWithDefaultProperties() {
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "minimum-user-limit-percent" },
- new String[] { "20",
- "75" }
- );
- startConfig();
- writeQueueDetails("default", q1Props);
- endConfig();
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
-
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String,String>>();
- Map<String, String> expProperties = new HashMap<String, String>();
- for (String key : q1Props.keySet()) {
- expProperties.put(key, q1Props.get(key));
- }
- expProperties.put("supports-priority", "false");
- expProperties.put("maximum-initialized-jobs-per-user", "2");
- queueDetails.put("default", expProperties);
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testReload() throws IOException {
- // use the setup in the test case testQueues as a base...
- testQueues();
-
- // write new values to the file...
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "20.5",
- "true",
- "40" }
- );
-
- Map<String, String> q2Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "100",
- "false",
- "50" }
- );
-
- openFile();
- startConfig();
- writeDefaultConfiguration();
- writeQueueDetails("default", q1Props);
- writeQueueDetails("production", q2Props);
- endConfig();
- testConf.reloadConfiguration();
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String, String>>();
- queueDetails.put("default", q1Props);
- queueDetails.put("production", q2Props);
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testQueueWithUserDefinedDefaultProperties() throws IOException {
- openFile();
- startConfig();
- writeUserDefinedDefaultConfiguration();
- endConfig();
-
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "true",
- "50" }
- );
-
- Map<String, String> q2Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "true",
- "50" }
- );
-
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
-
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String,String>>();
-
- queueDetails.put("default", q1Props);
- queueDetails.put("production", q2Props);
-
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testQueueWithDefaultPropertiesOverriden() throws IOException {
- openFile();
- startConfig();
- writeUserDefinedDefaultConfiguration();
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "true",
- "50" }
- );
-
- Map<String, String> q2Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "40",
- "true",
- "50" }
- );
- Map<String, String> q3Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "40",
- "true",
- "50" }
- );
- writeQueueDetails("production", q2Props);
- writeQueueDetails("test", q3Props);
- endConfig();
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
- Map<String, Map<String, String>> queueDetails
- = new HashMap<String, Map<String,String>>();
- queueDetails.put("default", q1Props);
- queueDetails.put("production", q2Props);
- queueDetails.put("test", q3Props);
- checkQueueProperties(testConf, queueDetails);
- }
-
- public void testInvalidUserLimit() throws IOException {
- openFile();
- startConfig();
- Map<String, String> q1Props = setupQueueProperties(
- new String[] { "capacity",
- "supports-priority",
- "minimum-user-limit-percent" },
- new String[] { "-1",
- "true",
- "-50" }
- );
- writeQueueDetails("default", q1Props);
- endConfig();
- try {
- testConf = new CapacitySchedulerConf(new Path(testConfFile));
- testConf.getMinimumUserLimitPercent("default");
- fail("Expect Invalid user limit to raise Exception");
- }catch(IllegalArgumentException e) {
- assertTrue(true);
- }
- }
-
public void testInitializationPollerProperties()
throws Exception {
/*
@@ -328,29 +112,6 @@
}
- private void checkQueueProperties(
- CapacitySchedulerConf testConf,
- Map<String, Map<String, String>> queueDetails) {
- for (String queueName : queueDetails.keySet()) {
- Map<String, String> map = queueDetails.get(queueName);
- assertEquals(Float.parseFloat(map.get("capacity")),
- testConf.getCapacity(queueName));
- assertEquals(Integer.parseInt(map.get("minimum-user-limit-percent")),
- testConf.getMinimumUserLimitPercent(queueName));
- assertEquals(Boolean.parseBoolean(map.get("supports-priority")),
- testConf.isPrioritySupported(queueName));
- }
- }
-
- private Map<String, String> setupQueueProperties(String[] keys,
- String[] values) {
- HashMap<String, String> map = new HashMap<String, String>();
- for(int i=0; i<keys.length; i++) {
- map.put(keys[i], values[i]);
- }
- return map;
- }
-
private void openFile() throws IOException {
if (testDataDir != null) {
@@ -366,33 +127,6 @@
writer.println("<?xml version=\"1.0\"?>");
writer.println("<configuration>");
}
-
- private void writeQueueDetails(String queue, Map<String, String> props) {
- for (String key : props.keySet()) {
- writer.println("<property>");
- writer.println("<name>mapred.capacity-scheduler.queue."
- + queue + "." + key +
- "</name>");
- writer.println("<value>"+props.get(key)+"</value>");
- writer.println("</property>");
- }
- }
-
-
- private void writeDefaultConfiguration() {
- writeProperty("mapred.capacity-scheduler.default-supports-priority"
- , "false");
- writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
- , "100");
- }
-
-
- private void writeUserDefinedDefaultConfiguration() {
- writeProperty("mapred.capacity-scheduler.default-supports-priority"
- , "true");
- writeProperty("mapred.capacity-scheduler.default-minimum-user-limit-percent"
- , "50");
- }
private void writeProperty(String name, String value) {
@@ -407,5 +141,31 @@
writer.println("</configuration>");
writer.close();
}
+
+ public void testConfigurationValuesConversion() throws IOException {
+ Properties prp = new Properties();
+
+ prp.setProperty("capacity","10");
+ prp.setProperty("maximum-capacity","20.5");
+ prp.setProperty("supports-priority","false");
+ prp.setProperty("minimum-user-limit-percent","23");
+
+ CapacitySchedulerConf conf = new CapacitySchedulerConf();
+ conf.setProperties("default",prp);
+
+ assertTrue(conf.getCapacity("default") == 10f);
+ assertTrue(conf.getMaxCapacity("default") == 20.5f);
+ assertTrue(conf.isPrioritySupported("default") == false);
+ assertTrue(conf.getMinimumUserLimitPercent("default")==23);
+
+ //check for inproper stuff
+ prp.setProperty("capacity","h");
+ prp.setProperty("maximum-capacity","20");
+
+ //This is because h is invalid value.
+ assertTrue(conf.getCapacity("default") == -1);
+
+ assertFalse(conf.getMaxCapacity("default") != 20);
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/capacity-scheduler/src/test/org/apache/hadoop/mapred/TestCapacitySchedulerWithJobTracker.java Sat Nov 28 20:26:01 2009
@@ -20,30 +20,37 @@
import java.util.Properties;
-import org.apache.hadoop.examples.SleepJob;
import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.server.jobtracker.JTConfig;
+import org.apache.hadoop.mapreduce.server.tasktracker.TTConfig;
+import org.apache.hadoop.mapreduce.SleepJob;
public class TestCapacitySchedulerWithJobTracker extends
- ClusterWithCapacityScheduler {
+ ClusterWithCapacityScheduler {
/**
- * Test case which checks if the jobs which
+ * Test case which checks if the jobs which
* fail initialization are removed from the
* {@link CapacityTaskScheduler} waiting queue.
- *
+ *
* @throws Exception
*/
public void testFailingJobInitalization() throws Exception {
Properties schedulerProps = new Properties();
- schedulerProps.put("mapred.capacity-scheduler.queue.default.capacity",
- "100");
Properties clusterProps = new Properties();
- clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(1));
- clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
- .valueOf(1));
- clusterProps.put("mapred.jobtracker.maxtasks.per.job", String.valueOf(1));
+ clusterProps.put("mapred.queue.names","default");
+ clusterProps.put(TTConfig.TT_MAP_SLOTS, String.valueOf(1));
+ clusterProps.put(TTConfig.TT_REDUCE_SLOTS, String.valueOf(1));
+ clusterProps.put(JTConfig.JT_TASKS_PER_JOB, String.valueOf(1));
// cluster capacity 1 maps, 1 reduces
startCluster(1, clusterProps, schedulerProps);
+ CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
+ .getTaskScheduler();
+
+ AbstractQueue root = scheduler.getRoot();
+ root.getChildren().get(0).getQueueSchedulingContext().setCapacityPercent(100);
+
JobConf conf = getJobConf();
conf.setSpeculativeExecution(false);
conf.setNumTasksToExecutePerJvm(-1);
@@ -51,45 +58,56 @@
sleepJob.setConf(conf);
Job job = sleepJob.createJob(3, 3, 1, 1, 1, 1);
job.waitForCompletion(false);
- assertFalse("The submitted job successfully completed",
- job.isSuccessful());
- CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
- .getTaskScheduler();
+ assertFalse(
+ "The submitted job successfully completed",
+ job.isSuccessful());
+
JobQueuesManager mgr = scheduler.jobQueuesManager;
- assertEquals("Failed job present in Waiting queue", 0, mgr
- .getWaitingJobCount("default"));
+ assertEquals(
+ "Failed job present in Waiting queue", 0, mgr
+ .getJobQueue("default").getWaitingJobCount());
}
/**
* Test case which checks {@link JobTracker} and {@link CapacityTaskScheduler}
- *
+ * <p/>
* Test case submits 2 jobs in two different capacity scheduler queues.
* And checks if the jobs successfully complete.
- *
+ *
* @throws Exception
*/
+
public void testJobTrackerIntegration() throws Exception {
Properties schedulerProps = new Properties();
- String[] queues = new String[] { "Q1", "Q2" };
+ String[] queues = new String[]{"Q1", "Q2"};
Job jobs[] = new Job[2];
- for (String q : queues) {
- schedulerProps.put(CapacitySchedulerConf
- .toFullPropertyName(q, "capacity"), "50");
- schedulerProps.put(CapacitySchedulerConf.toFullPropertyName(q,
- "minimum-user-limit-percent"), "100");
- }
Properties clusterProps = new Properties();
- clusterProps.put("mapred.tasktracker.map.tasks.maximum", String.valueOf(2));
- clusterProps.put("mapred.tasktracker.reduce.tasks.maximum", String
- .valueOf(2));
+ clusterProps.put(TTConfig.TT_MAP_SLOTS, String.valueOf(2));
+ clusterProps.put(TTConfig.TT_REDUCE_SLOTS, String.valueOf(2));
clusterProps.put("mapred.queue.names", queues[0] + "," + queues[1]);
startCluster(2, clusterProps, schedulerProps);
+ CapacityTaskScheduler scheduler = (CapacityTaskScheduler) getJobTracker()
+ .getTaskScheduler();
+
+
+
+ AbstractQueue root = scheduler.getRoot();
+
+ for(AbstractQueue q : root.getChildren()) {
+ q.getQueueSchedulingContext().setCapacityPercent(50);
+ q.getQueueSchedulingContext().setUlMin(100);
+ }
+
+
+ LOG.info("WE CREATED THE QUEUES TEST 2");
+ // scheduler.taskTrackerManager.getQueueManager().setQueues(qs);
+ // scheduler.start();
JobConf conf = getJobConf();
conf.setSpeculativeExecution(false);
- conf.set("mapred.committer.job.setup.cleanup.needed", "false");
+ conf.set(JobContext.SETUP_CLEANUP_NEEDED, "false");
conf.setNumTasksToExecutePerJvm(-1);
conf.setQueueName(queues[0]);
SleepJob sleepJob1 = new SleepJob();
@@ -105,9 +123,11 @@
sleepJob2.setConf(conf2);
jobs[1] = sleepJob2.createJob(3, 3, 5, 3, 5, 3);
jobs[1].waitForCompletion(false);
- assertTrue("Sleep job submitted to queue 1 is not successful", jobs[0]
+ assertTrue(
+ "Sleep job submitted to queue 1 is not successful", jobs[0]
.isSuccessful());
- assertTrue("Sleep job submitted to queue 2 is not successful", jobs[1]
+ assertTrue(
+ "Sleep job submitted to queue 2 is not successful", jobs[1]
.isSuccessful());
}
}
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/data_join:713112
/hadoop/core/trunk/src/contrib/data_join:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/data_join:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/data_join:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,13 +24,8 @@
<artifact conf="master"/>
</publications>
<dependencies>
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="common->default"/>
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="common->master"/>
- </dependencies>
+ <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
+ <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+ </dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/data_join/src/java/org/apache/hadoop/contrib/utils/join/DataJoinMapperBase.java Sat Nov 28 20:26:01 2009
@@ -25,6 +25,7 @@
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.OutputCollector;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.JobContext;
/**
* This abstract class serves as the base class for the mapper class of a data
@@ -55,7 +56,7 @@
public void configure(JobConf job) {
super.configure(job);
this.job = job;
- this.inputFile = job.get("map.input.file");
+ this.inputFile = job.get(JobContext.MAP_INPUT_FILE);
this.inputTag = generateInputTag(this.inputFile);
}
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/src/contrib/dynamic-scheduler:713112
/hadoop/core/trunk/src/contrib/dynamic-scheduler:784975-786373
-/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/dynamic-scheduler:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/README
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/README?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/README (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/README Sat Nov 28 20:26:01 2009
@@ -33,7 +33,7 @@
enforces the queue shares in the form of map and reduce slots of running jobs.
Hadoop Configuration (e.g. hadoop-site.xml):
-mapred.jobtracker.taskScheduler
+mapreduce.jobtracker.taskscheduler
This needs to be set to
org.apache.hadoop.mapred.DynamicPriorityScheduler
to use the dynamic scheduler.
@@ -97,7 +97,8 @@
For the servlet operations query path is everything that comes after /scheduler?
in the url. For job submission the query path is just the empty string "".
Job submissions also need to set the following job properties:
--Dmapred.job.timestamp=<ms epoch time> -Dmapred.job.signature=<signature as above> -Dmapred.job.queue.name=<queue>
+-Dmapred.job.timestamp=<ms epoch time>
+-Dmapred.job.signature=<signature as above> -Dmapreduce.job.queue.name=<queue>
Note queue must match the user submitting the job.
Example python query
@@ -161,6 +162,6 @@
params = ""
timestamp = long(time.time()*1000)
params = params + "&user=%s×tamp=%d" % (USER,timestamp)
-print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapred.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)
+print "-Dmapred.job.timestamp=%d -Dmapred.job.signature=%s -Dmapreduce.job.queue.name=%s" % (timestamp, hmac_sha1(params, KEY), USER)
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/ivy.xml Sat Nov 28 20:26:01 2009
@@ -20,204 +20,36 @@
<info organisation="org.apache.hadoop" module="${ant.project.name}">
<license name="Apache 2.0"/>
<ivyauthor name="Apache Hadoop Team" url="http://hadoop.apache.org"/>
- <description>
- Apache Hadoop contrib
- </description>
+ <description> Apache Hadoop contrib </description>
</info>
<configurations defaultconfmapping="default">
<!--these match the Maven configurations-->
<conf name="default" extends="master,runtime"/>
<conf name="master" description="contains the artifact but no dependencies"/>
- <conf name="runtime" description="runtime but not the artifact"
- extends="client,server,s3-server,kfs"/>
-
- <conf name="mandatory" description="contains the critical dependencies"
- extends="commons-logging,log4j"/>
-
- <!--
- These public configurations contain the core dependencies for running hadoop client or server.
- The server is effectively a superset of the client.
- -->
- <conf name="client" description="client-side dependencies"
- extends="mandatory,httpclient"/>
- <conf name="server" description="server-side dependencies"
- extends="client"/>
- <conf name="s3-client" description="dependencies for working with S3/EC2 infrastructure"
- extends="client"/>
- <conf name="s3-server" description="dependencies for running on S3/EC2 infrastructure"
- extends="s3-client,server"/>
- <conf name="kfs" description="dependencies for KFS file system support"/>
- <conf name="ftp" description="dependencies for workign with FTP filesytems"
- extends="mandatory"/>
- <conf name="jetty" description="Jetty provides the in-VM HTTP daemon" extends="commons-logging"/>
-
- <!--Private configurations. -->
-
- <conf name="common" visibility="private" extends="runtime,mandatory,httpclient,ftp,jetty"
- description="common artifacts"/>
- <conf name="javadoc" visibility="private" description="artiracts required while performing doc generation"
- extends="common,mandatory,jetty,lucene"/>
- <!--Testing pulls in everything-->
- <conf name="test" extends="common,default,s3-server,kfs" visibility="private"
- description="the classpath needed to run tests"/>
- <conf name="releaseaudit" visibility="private"
- description="Artifacts required for releaseaudit target"/>
-
- <conf name="commons-logging" visibility="private"/>
- <conf name="httpclient" visibility="private" extends="commons-logging"/>
- <conf name="log4j" visibility="private"/>
- <conf name="lucene" visibility="private"/>
- <conf name="jdiff" visibility="private" extends="log4j,s3-client,jetty,server"/>
- <conf name="checkstyle" visibility="private"/>
+ <conf name="runtime" description="runtime but not the artifact" />
+ <conf name="common" visibility="private" extends="runtime" description="common artifacts"/>
</configurations>
<publications>
- <!--get the artifact from our module name-->
<artifact conf="master"/>
</publications>
- <dependencies>
-
- <!--used client side-->
-
- <dependency org="checkstyle"
- name="checkstyle"
- rev="${checkstyle.version}"
- conf="checkstyle->default"/>
- <dependency org="jdiff"
- name="jdiff"
- rev="${jdiff.version}"
- conf="jdiff->default"/>
- <dependency org="xerces"
- name="xerces"
- rev="${xerces.version}"
- conf="jdiff->default">
- </dependency>
-
- <dependency org="xmlenc"
- name="xmlenc"
- rev="${xmlenc.version}"
- conf="server->default"/>
-
- <!--Configuration: httpclient-->
-
- <!--
- commons-httpclient asks for too many files.
- All it needs is commons-codec and commons-logging JARs
- -->
- <dependency org="commons-httpclient"
- name="commons-httpclient"
- rev="${commons-httpclient.version}"
- conf="httpclient->master">
- </dependency>
-
- <dependency org="commons-codec"
- name="commons-codec"
- rev="${commons-codec.version}"
- conf="httpclient->default"/>
-
- <dependency org="commons-net"
- name="commons-net"
- rev="${commons-net.version}"
- conf="ftp->default"/>
- <!--Configuration: Jetty -->
-
- <dependency org="org.mortbay.jetty"
- name="jetty"
- rev="${jetty.version}"
- conf="jetty->master"/>
- <dependency org="org.mortbay.jetty"
- name="jetty-util"
- rev="${jetty-util.version}"
- conf="jetty->master"/>
-
- <dependency org="tomcat"
- name="jasper-runtime"
- rev="${jasper.version}"
- conf="jetty->master"/>
- <dependency org="tomcat"
- name="jasper-compiler"
- rev="${jasper.version}"
- conf="jetty->master"/>
- <dependency org="commons-el"
- name="commons-el"
- rev="${commons-el.version}"
- conf="jetty->master"/>
-
-
- <!--Configuration: commons-logging -->
-
- <!--it is essential that only the master JAR of commons logging
- is pulled in, as its dependencies are usually a mess, including things
- like out of date servlet APIs, bits of Avalon, etc.
- -->
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="commons-logging->master"/>
-
-
- <!--Configuration: commons-logging -->
-
- <!--log4J is not optional until commons-logging.properties is stripped out of the JAR -->
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="log4j->master"/>
-
- <!--Configuration: s3-client -->
- <!--there are two jets3t projects in the repository; this one goes up to 0.6 and
- is assumed to be the live one-->
- <dependency org="net.java.dev.jets3t"
- name="jets3t"
- rev="${jets3t.version}"
- conf="s3-client->master"/>
- <dependency org="commons-net"
- name="commons-net"
- rev="${commons-net.version}"
- conf="s3-client->master"/>
- <dependency org="org.mortbay.jetty"
- name="servlet-api-2.5"
- rev="${servlet-api-2.5.version}"
- conf="s3-client->master"/>
-
- <!--Configuration: test -->
-
- <!--artifacts needed for testing -->
- <dependency org="junit"
- name="junit"
- rev="${junit.version}"
- conf="common->default"/>
- <dependency org="com.google.code.p.arat"
- name="rat-lib"
- rev="${rats-lib.version}"
- conf="releaseaudit->default"/>
- <dependency org="commons-lang"
- name="commons-lang"
- rev="${commons-lang.version}"
- conf="releaseaudit->default"/>
- <dependency org="commons-collections"
- name="commons-collections"
- rev="${commons-collections.version}"
- conf="releaseaudit->default"/>
- <dependency org="org.apache.lucene"
- name="lucene-core"
- rev="${lucene-core.version}"
- conf="javadoc->default"/>
- <dependency org="commons-logging"
- name="commons-logging-api"
- rev="${commons-logging-api.version}"
- conf="common->default"/>
- <dependency org="org.slf4j"
- name="slf4j-api"
- rev="${slf4j-api.version}"
- conf="common->master"/>
- <dependency org="org.slf4j"
- name="slf4j-log4j12"
- rev="${slf4j-log4j12.version}"
- conf="common->master">
- </dependency>
- </dependencies>
-
+ <dependencies>
+ <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.mortbay.jetty" name="jetty" rev="${jetty.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="jetty-util" rev="${jetty-util.version}" conf="common->master"/>
+ <dependency org="tomcat" name="jasper-runtime" rev="${jasper.version}" conf="common->master"/>
+ <dependency org="tomcat" name="jasper-compiler" rev="${jasper.version}" conf="common->master"/>
+ <dependency org="commons-el" name="commons-el" rev="${commons-el.version}" conf="common->master"/>
+ <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->master"/>
+ <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+ <dependency org="net.java.dev.jets3t" name="jets3t" rev="${jets3t.version}" conf="common->master"/>
+ <dependency org="commons-net" name="commons-net" rev="${commons-net.version}" conf="common->master"/>
+ <dependency org="org.mortbay.jetty" name="servlet-api-2.5" rev="${servlet-api-2.5.version}" conf="common->master"/>
+ <dependency org="junit" name="junit" rev="${junit.version}" conf="common->default"/>
+ <dependency org="commons-logging" name="commons-logging-api" rev="${commons-logging-api.version}" conf="common->default"/>
+ <dependency org="org.slf4j" name="slf4j-api" rev="${slf4j-api.version}" conf="common->master"/>
+ <dependency org="org.slf4j" name="slf4j-log4j12" rev="${slf4j-log4j12.version}" conf="common->master"/>
+ </dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/java/org/apache/hadoop/mapred/DynamicPriorityScheduler.java Sat Nov 28 20:26:01 2009
@@ -18,15 +18,9 @@
package org.apache.hadoop.mapred;
import java.io.IOException;
-import java.io.BufferedReader;
-import java.io.FileReader;
-import java.io.BufferedWriter;
-import java.io.FileWriter;
-import java.io.PrintWriter;
import java.util.Collection;
import java.util.List;
import java.util.Set;
-import java.util.HashSet;
import java.util.HashMap;
import java.util.Map;
import java.util.Timer;
@@ -77,7 +71,7 @@
Allocations(Configuration conf, QueueManager queueManager) {
this.conf = conf;
this.queueManager = queueManager;
- this.infoQueues = queueManager.getQueues();
+ this.infoQueues = queueManager.getLeafQueueNames();
this.store = ReflectionUtils.newInstance(
conf.getClass(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_STORE,
@@ -237,7 +231,7 @@
long interval = conf.getLong(PrioritySchedulerOptions.DYNAMIC_SCHEDULER_ALLOC_INTERVAL,20)*1000;
timer.scheduleAtFixedRate(allocations, interval, interval);
- for (String queue: queueManager.getQueues()) {
+ for (String queue: queueManager.getLeafQueueNames()) {
Object info = queueManager.getSchedulerInfo(queue);
QueueInfo queueInfo = new QueueInfo(queue, info, allocations);
queueManager.setSchedulerInfo(queue, queueInfo);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/dynamic-scheduler/src/test/org/apache/hadoop/mapred/BaseSchedulerTest.java Sat Nov 28 20:26:01 2009
@@ -138,7 +138,7 @@
void setQueues(Set<String> queues) {
this.queues = queues;
}
- public synchronized Set<String> getQueues() {
+ public synchronized Set<String> getLeafQueueNames() {
return queues;
}
}
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/core/src/contrib/eclipse-plugin:713112
/hadoop/core/trunk/src/contrib/eclipse-plugin:776175-784663
-/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/eclipse-plugin:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/build.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/build.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/build.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/build.xml Sat Nov 28 20:26:01 2009
@@ -68,7 +68,7 @@
<target name="jar" depends="compile" unless="skip.contrib">
<mkdir dir="${build.dir}/lib"/>
<copy tofile="${build.dir}/lib/hadoop-core.jar">
- <fileset dir="${hadoop.root}/lib" includes="hadoop-core-*.jar" excludes="hadoop-core-test-*.jar"/>
+ <fileset dir="${hadoop.root}/build/ivy/lib/Hadoop/common" includes="hadoop-core-*.jar" excludes="hadoop-core-test-*.jar"/>
</copy>
<copy file="${hadoop.root}/build/ivy/lib/Hadoop/common/commons-cli-${commons-cli.version}.jar" todir="${build.dir}/lib" verbose="true"/>
<jar
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/ivy.xml Sat Nov 28 20:26:01 2009
@@ -24,13 +24,9 @@
<artifact conf="master"/>
</publications>
<dependencies>
- <dependency org="commons-logging"
- name="commons-logging"
- rev="${commons-logging.version}"
- conf="common->default"/>
- <dependency org="log4j"
- name="log4j"
- rev="${log4j.version}"
- conf="common->master"/>
- </dependencies>
+ <dependency org="org.apache.hadoop" name="hadoop-core" rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs" rev="${hadoop-hdfs.version}" conf="common->default"/>
+ <dependency org="commons-logging" name="commons-logging" rev="${commons-logging.version}" conf="common->default"/>
+ <dependency org="log4j" name="log4j" rev="${log4j.version}" conf="common->master"/>
+ </dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/ConfProp.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/ConfProp.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/ConfProp.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/ConfProp.java Sat Nov 28 20:26:01 2009
@@ -84,7 +84,7 @@
* Property name for naming the job tracker (URI). This property is related
* to {@link #PI_MASTER_HOST_NAME}
*/
- JOB_TRACKER_URI(false, "mapred.job.tracker", "localhost:50020"),
+ JOB_TRACKER_URI(false, "mapreduce.jobtracker.address", "localhost:50020"),
/**
* Property name for naming the default file system (URI).
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/eclipse-plugin/src/java/org/apache/hadoop/eclipse/server/HadoopJob.java Sat Nov 28 20:26:01 2009
@@ -50,17 +50,20 @@
}
static JobState ofInt(int state) {
- switch (state) {
- case JobStatus.PREP:
- return PREPARE;
- case JobStatus.RUNNING:
- return RUNNING;
- case JobStatus.FAILED:
- return FAILED;
- case JobStatus.SUCCEEDED:
- return SUCCEEDED;
- default:
- return null;
+ if (state == JobStatus.PREP) {
+ return PREPARE;
+ }
+ else if (state == JobStatus.RUNNING) {
+ return RUNNING;
+ }
+ else if (state == JobStatus.FAILED) {
+ return FAILED;
+ }
+ else if (state == JobStatus.SUCCEEDED) {
+ return SUCCEEDED;
+ }
+ else {
+ return null;
}
}
}
Propchange: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Sat Nov 28 20:26:01 2009
@@ -1,3 +1,3 @@
/hadoop/core/branches/branch-0.19/mapred/src/contrib/fairscheduler:713112
/hadoop/core/trunk/src/contrib/fairscheduler:776175-786373
-/hadoop/mapreduce/trunk/src/contrib/fairscheduler:804974-807678
+/hadoop/mapreduce/trunk/src/contrib/fairscheduler:804974-884916
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/designdoc/fair_scheduler_design_doc.tex Sat Nov 28 20:26:01 2009
@@ -45,9 +45,9 @@
\subsection{Pools}
-The Fair Scheduler groups jobs into ``pools" and performs fair sharing between these pools. Each pool can use either FIFO or fair sharing to schedule jobs internal to the pool. The pool that a job is placed in is determined by a JobConf property, the ``pool name property". By default, this is {\tt user.name}, so that there is one pool per user. However, different properties can be used, e.g.~{\tt group.name} to have one pool per Unix group.
+The Fair Scheduler groups jobs into ``pools" and performs fair sharing between these pools. Each pool can use either FIFO or fair sharing to schedule jobs internal to the pool. The pool that a job is placed in is determined by a JobConf property, the ``pool name property". By default, this is {\tt mapreduce.job.user.name}, so that there is one pool per user. However, different properties can be used, e.g.~{\tt group.name} to have one pool per Unix group.
-A common trick is to set the pool name property to an unused property name such as {\tt pool.name} and make this default to {\tt user.name}, so that there is one pool per user but it is also possible to place jobs into ``special" pools by setting their {\tt pool.name} directly. The {\tt mapred-site.xml} snippet below shows how to do this:
+A common trick is to set the pool name property to an unused property name such as {\tt pool.name} and make this default to {\tt mapreduce.job.user.name}, so that there is one pool per user but it is also possible to place jobs into ``special" pools by setting their {\tt pool.name} directly. The {\tt mapred-site.xml} snippet below shows how to do this:
\begin{verbatim}
<property>
@@ -57,7 +57,7 @@
<property>
<name>pool.name</name>
- <value>${user.name}</value>
+ <value>${mapreduce.job.user.name}</value>
</property>
\end{verbatim}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/ivy.xml
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/ivy.xml?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/ivy.xml (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/ivy.xml Sat Nov 28 20:26:01 2009
@@ -22,6 +22,14 @@
<artifact conf="master"/>
</publications>
<dependencies>
+ <dependency org="org.apache.hadoop" name="hadoop-core"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-core-test"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs"
+ rev="${hadoop-core.version}" conf="common->default"/>
+ <dependency org="org.apache.hadoop" name="hadoop-hdfs-test"
+ rev="${hadoop-core.version}" conf="common->default"/>
<dependency org="commons-logging"
name="commons-logging"
rev="${commons-logging.version}"
@@ -38,5 +46,37 @@
name="junit"
rev="${junit.version}"
conf="common->default"/>
+ <dependency org="org.apache.hadoop"
+ name="avro"
+ rev="${avro.version}"
+ conf="common->default"/>
+ <dependency org="org.codehaus.jackson"
+ name="jackson-mapper-asl"
+ rev="${jackson.version}"
+ conf="common->default"/>
+ <dependency org="com.thoughtworks.paranamer"
+ name="paranamer"
+ rev="${paranamer.version}"
+ conf="common->default"/>
+ <dependency org="com.thoughtworks.paranamer"
+ name="paranamer-ant"
+ rev="${paranamer.version}"
+ conf="common->default"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty-util"
+ rev="${jetty-util.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jetty"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-api-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
+ <dependency org="org.mortbay.jetty"
+ name="jsp-2.1"
+ rev="${jetty.version}"
+ conf="common->master"/>
</dependencies>
</ivy-module>
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/CapBasedLoadManager.java Sat Nov 28 20:26:01 2009
@@ -18,12 +18,23 @@
package org.apache.hadoop.mapred;
+import org.apache.hadoop.mapreduce.TaskType;
+import org.apache.hadoop.conf.Configuration;
+
/**
* A {@link LoadManager} for use by the {@link FairScheduler} that allocates
* tasks evenly across nodes up to their per-node maximum, using the default
* load management algorithm in Hadoop.
*/
public class CapBasedLoadManager extends LoadManager {
+
+ float maxDiff = 0.0f;
+
+ public void setConf(Configuration conf) {
+ super.setConf(conf);
+ maxDiff = conf.getFloat("mapred.fairscheduler.load.max.diff", 0.0f);
+ }
+
/**
* Determine how many tasks of a given type we want to run on a TaskTracker.
* This cap is chosen based on how many tasks of that type are outstanding in
@@ -32,7 +43,7 @@
* machines sent out heartbeats earliest.
*/
int getCap(int totalRunnableTasks, int localMaxTasks, int totalSlots) {
- double load = ((double)totalRunnableTasks) / totalSlots;
+ double load = maxDiff + ((double)totalRunnableTasks) / totalSlots;
return (int) Math.ceil(localMaxTasks * Math.min(1.0, load));
}
@@ -49,4 +60,10 @@
return tracker.countReduceTasks() < getCap(totalRunnableReduces,
tracker.getMaxReduceSlots(), totalReduceSlots);
}
+
+ @Override
+ public boolean canLaunchTask(TaskTrackerStatus tracker,
+ JobInProgress job, TaskType type) {
+ return true;
+ }
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Sat Nov 28 20:26:01 2009
@@ -212,6 +212,13 @@
LOG.info("Successfully configured FairScheduler");
}
+ /**
+ * Returns the LoadManager object used by the Fair Share scheduler
+ */
+ LoadManager getLoadManager() {
+ return loadMgr;
+ }
+
@Override
public void terminate() throws IOException {
if (eventLog != null)
@@ -315,69 +322,116 @@
TaskTrackerStatus tts = tracker.getStatus();
- // Scan to see whether any job needs to run a map, then a reduce
+ int mapsAssigned = 0; // loop counter for map in the below while loop
+ int reducesAssigned = 0; // loop counter for reduce in the below while
+ int mapCapacity = maxTasksToAssign(TaskType.MAP, tts);
+ int reduceCapacity = maxTasksToAssign(TaskType.REDUCE, tts);
+ boolean mapRejected = false; // flag used for ending the loop
+ boolean reduceRejected = false; // flag used for ending the loop
+
+ // Keep track of which jobs were visited for map tasks and which had tasks
+ // launched, so that we can later mark skipped jobs for delay scheduling
+ Set<JobInProgress> visitedForMap = new HashSet<JobInProgress>();
+ Set<JobInProgress> visitedForReduce = new HashSet<JobInProgress>();
+ Set<JobInProgress> launchedMap = new HashSet<JobInProgress>();
+
ArrayList<Task> tasks = new ArrayList<Task>();
- for (TaskType taskType: MAP_AND_REDUCE) {
- // Keep track of which jobs were visited and which had tasks launched,
- // so that we can later mark skipped jobs for delay scheduling
- Set<JobInProgress> visited = new HashSet<JobInProgress>();
- Set<JobInProgress> launched = new HashSet<JobInProgress>();
- // Compute a maximum number of tasks to assign on this task tracker
- int cap = maxTasksToAssign(taskType, tts);
- // Assign up to cap tasks
- for (int i = 0; i < cap; i++) {
- // Break if all runnable tasks of this type are already running
- if (taskType == TaskType.MAP && runningMaps == runnableMaps ||
- taskType == TaskType.REDUCE && runningReduces == runnableReduces)
- break;
- // Break if the node can't support another task of this type
- boolean canAssign = (taskType == TaskType.MAP) ?
- loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots) :
- loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots);
- if (canAssign) {
- // Get the map or reduce schedulables and sort them by fair sharing
- List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
- Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
- for (Schedulable sched: scheds) {
- eventLog.log("INFO", "Checking for " + taskType +
- " task in " + sched.getName());
- Task task = sched.assignTask(tts, currentTime, visited);
- if (task != null) {
- JobInProgress job = taskTrackerManager.getJob(task.getJobID());
- eventLog.log("ASSIGN", trackerName, taskType,
- job.getJobID(), task.getTaskID());
- launched.add(job);
- // Update running task counts, and the job's locality level
- if (taskType == TaskType.MAP) {
- runningMaps++;
- updateLastMapLocalityLevel(job, task, tts);
- } else {
- runningReduces++;
- }
- // Add task to the list of assignments
- tasks.add(task);
- break;
- } // end if(task != null)
- } // end for(Schedulable sched: scheds)
+ // Scan jobs to assign tasks until neither maps nor reduces can be assigned
+ while (true) {
+ // Computing the ending conditions for the loop
+ // Reject a task type if one of the following condition happens
+ // 1. number of assigned task reaches per heatbeat limit
+ // 2. number of running tasks reaches runnable tasks
+ // 3. task is rejected by the LoadManager.canAssign
+ if (!mapRejected) {
+ if (mapsAssigned == mapCapacity ||
+ runningMaps == runnableMaps ||
+ !loadMgr.canAssignMap(tts, runnableMaps, totalMapSlots)) {
+ eventLog.log("INFO", "Can't assign another MAP to " + trackerName);
+ mapRejected = true;
+ }
+ }
+ if (!reduceRejected) {
+ if (reducesAssigned == reduceCapacity ||
+ runningReduces == runnableReduces ||
+ !loadMgr.canAssignReduce(tts, runnableReduces, totalReduceSlots)) {
+ eventLog.log("INFO", "Can't assign another REDUCE to " + trackerName);
+ reduceRejected = true;
+ }
+ }
+ // Exit while (true) loop if
+ // 1. neither maps nor reduces can be assigned
+ // 2. assignMultiple is off and we already assigned one task
+ if (mapRejected && reduceRejected ||
+ !assignMultiple && tasks.size() > 0) {
+ break; // This is the only exit of the while (true) loop
+ }
+
+ // Determine which task type to assign this time
+ // First try choosing a task type which is not rejected
+ TaskType taskType;
+ if (mapRejected) {
+ taskType = TaskType.REDUCE;
+ } else if (reduceRejected) {
+ taskType = TaskType.MAP;
+ } else {
+ // If both types are available, choose the task type with fewer running
+ // tasks on the task tracker to prevent that task type from starving
+ if (tts.countMapTasks() <= tts.countReduceTasks()) {
+ taskType = TaskType.MAP;
} else {
- eventLog.log("INFO", "Can't assign another " + taskType +
- " to " + trackerName);
- break;
- }
- } // end for(i = 0; i < cap; i++)
- // If we were assigning maps, mark any jobs that were visited but
- // did not launch a task as skipped on this heartbeat
- if (taskType == TaskType.MAP) {
- for (JobInProgress job: visited) {
- if (!launched.contains(job)) {
- infos.get(job).skippedAtLastHeartbeat = true;
+ taskType = TaskType.REDUCE;
+ }
+ }
+
+ // Get the map or reduce schedulables and sort them by fair sharing
+ List<PoolSchedulable> scheds = getPoolSchedulables(taskType);
+ Collections.sort(scheds, new SchedulingAlgorithms.FairShareComparator());
+ boolean foundTask = false;
+ for (Schedulable sched: scheds) { // This loop will assign only one task
+ eventLog.log("INFO", "Checking for " + taskType +
+ " task in " + sched.getName());
+ Task task = taskType == TaskType.MAP ?
+ sched.assignTask(tts, currentTime, visitedForMap) :
+ sched.assignTask(tts, currentTime, visitedForReduce);
+ if (task != null) {
+ foundTask = true;
+ JobInProgress job = taskTrackerManager.getJob(task.getJobID());
+ eventLog.log("ASSIGN", trackerName, taskType,
+ job.getJobID(), task.getTaskID());
+ // Update running task counts, and the job's locality level
+ if (taskType == TaskType.MAP) {
+ launchedMap.add(job);
+ mapsAssigned++;
+ runningMaps++;
+ updateLastMapLocalityLevel(job, task, tts);
+ } else {
+ reducesAssigned++;
+ runningReduces++;
}
+ // Add task to the list of assignments
+ tasks.add(task);
+ break; // This break makes this loop assign only one task
+ } // end if(task != null)
+ } // end for(Schedulable sched: scheds)
+
+ // Reject the task type if we cannot find a task
+ if (!foundTask) {
+ if (taskType == TaskType.MAP) {
+ mapRejected = true;
+ } else {
+ reduceRejected = true;
}
}
- // Return if assignMultiple was disabled and we found a task
- if (!assignMultiple && tasks.size() > 0)
- return tasks;
- } // end for(TaskType taskType: MAP_AND_REDUCE)
+ } // end while (true)
+
+ // Mark any jobs that were visited for map tasks but did not launch a task
+ // as skipped on this heartbeat
+ for (JobInProgress job: visitedForMap) {
+ if (!launchedMap.contains(job)) {
+ infos.get(job).skippedAtLastHeartbeat = true;
+ }
+ }
// If no tasks were found, return null
return tasks.isEmpty() ? null : tasks;
@@ -824,7 +878,11 @@
List<TaskStatus> statuses = new ArrayList<TaskStatus>();
for (TaskInProgress tip: tips) {
for (TaskAttemptID id: tip.getActiveTasks().keySet()) {
- statuses.add(tip.getTaskStatus(id));
+ TaskStatus stat = tip.getTaskStatus(id);
+ // status is null when the task has been scheduled but not yet running
+ if (stat != null) {
+ statuses.add(stat);
+ }
}
}
return statuses;
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairSchedulerServlet.java Sat Nov 28 20:26:01 2009
@@ -18,7 +18,9 @@
package org.apache.hadoop.mapred;
+import java.io.ByteArrayOutputStream;
import java.io.IOException;
+import java.io.OutputStream;
import java.io.PrintWriter;
import java.text.DateFormat;
import java.text.SimpleDateFormat;
@@ -118,7 +120,12 @@
}
// Print out the normal response
response.setContentType("text/html");
- PrintWriter out = new PrintWriter(response.getOutputStream());
+
+ // Because the client may read arbitrarily slow, and we hold locks while
+ // the servlet outputs, we want to write to our own buffer which we know
+ // won't block.
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ PrintWriter out = new PrintWriter(baos);
String hostname = StringUtils.simpleHostname(
jobTracker.getJobTrackerMachine());
out.print("<html><head>");
@@ -132,6 +139,11 @@
showJobs(out, advancedView);
out.print("</body></html>\n");
out.close();
+
+ // Flush our buffer to the real servlet output
+ OutputStream servletOut = response.getOutputStream();
+ baos.writeTo(servletOut);
+ servletOut.close();
}
/**
@@ -202,55 +214,57 @@
out.print("<th>Finished</th><th>Running</th><th>Fair Share</th>" +
(advancedView ? "<th>Weight</th>" : ""));
out.print("</tr>\n");
- Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
- synchronized (scheduler) {
- for (JobInProgress job: runningJobs) {
- JobProfile profile = job.getProfile();
- JobInfo info = scheduler.infos.get(job);
- if (info == null) { // Job finished, but let's show 0's for info
- info = new JobInfo(null, null);
- }
- out.print("<tr>\n");
- out.printf("<td>%s</td>\n", DATE_FORMAT.format(
- new Date(job.getStartTime())));
- out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
- profile.getJobID(), profile.getJobID());
- out.printf("<td>%s</td>\n", profile.getUser());
- out.printf("<td>%s</td>\n", profile.getJobName());
- if (JSPUtil.privateActionsAllowed()) {
- out.printf("<td>%s</td>\n", generateSelect(scheduler
- .getPoolManager().getPoolNames(), scheduler.getPoolManager()
- .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
- + profile.getJobID() + (advancedView ? "&advanced" : "")));
- out.printf("<td>%s</td>\n", generateSelect(Arrays
- .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
- "VERY_HIGH" }), job.getPriority().toString(),
- "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
- + (advancedView ? "&advanced" : "")));
- } else {
- out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
- out.printf("<td>%s</td>\n", job.getPriority().toString());
- }
- Pool pool = scheduler.getPoolManager().getPool(job);
- String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
- String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
- out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
- job.finishedMaps(), job.desiredMaps(),
- info.mapSchedulable.getRunningTasks(),
- mapShare);
- if (advancedView) {
- out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
- }
- String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
- String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
- out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
- job.finishedReduces(), job.desiredReduces(),
- info.reduceSchedulable.getRunningTasks(),
- reduceShare);
- if (advancedView) {
- out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+ synchronized (jobTracker) {
+ Collection<JobInProgress> runningJobs = jobTracker.getRunningJobs();
+ synchronized (scheduler) {
+ for (JobInProgress job: runningJobs) {
+ JobProfile profile = job.getProfile();
+ JobInfo info = scheduler.infos.get(job);
+ if (info == null) { // Job finished, but let's show 0's for info
+ info = new JobInfo(null, null);
+ }
+ out.print("<tr>\n");
+ out.printf("<td>%s</td>\n", DATE_FORMAT.format(
+ new Date(job.getStartTime())));
+ out.printf("<td><a href=\"jobdetails.jsp?jobid=%s\">%s</a></td>",
+ profile.getJobID(), profile.getJobID());
+ out.printf("<td>%s</td>\n", profile.getUser());
+ out.printf("<td>%s</td>\n", profile.getJobName());
+ if (JSPUtil.privateActionsAllowed()) {
+ out.printf("<td>%s</td>\n", generateSelect(scheduler
+ .getPoolManager().getPoolNames(), scheduler.getPoolManager()
+ .getPoolName(job), "/scheduler?setPool=<CHOICE>&jobid="
+ + profile.getJobID() + (advancedView ? "&advanced" : "")));
+ out.printf("<td>%s</td>\n", generateSelect(Arrays
+ .asList(new String[] { "VERY_LOW", "LOW", "NORMAL", "HIGH",
+ "VERY_HIGH" }), job.getPriority().toString(),
+ "/scheduler?setPriority=<CHOICE>&jobid=" + profile.getJobID()
+ + (advancedView ? "&advanced" : "")));
+ } else {
+ out.printf("<td>%s</td>\n", scheduler.getPoolManager().getPoolName(job));
+ out.printf("<td>%s</td>\n", job.getPriority().toString());
+ }
+ Pool pool = scheduler.getPoolManager().getPool(job);
+ String mapShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.mapSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedMaps(), job.desiredMaps(),
+ info.mapSchedulable.getRunningTasks(),
+ mapShare);
+ if (advancedView) {
+ out.printf("<td>%.1f</td>\n", info.mapSchedulable.getWeight());
+ }
+ String reduceShare = (pool.getSchedulingMode() == SchedulingMode.FAIR) ?
+ String.format("%.1f", info.reduceSchedulable.getFairShare()) : "NA";
+ out.printf("<td>%d / %d</td><td>%d</td><td>%s</td>\n",
+ job.finishedReduces(), job.desiredReduces(),
+ info.reduceSchedulable.getRunningTasks(),
+ reduceShare);
+ if (advancedView) {
+ out.printf("<td>%.1f</td>\n", info.reduceSchedulable.getWeight());
+ }
+ out.print("</tr>\n");
}
- out.print("</tr>\n");
}
}
out.print("</table>\n");
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/JobSchedulable.java Sat Nov 28 20:26:01 2009
@@ -125,6 +125,13 @@
TaskTrackerManager ttm = scheduler.taskTrackerManager;
ClusterStatus clusterStatus = ttm.getClusterStatus();
int numTaskTrackers = clusterStatus.getTaskTrackers();
+
+ // check with the load manager whether it is safe to
+ // launch this task on this taskTracker.
+ LoadManager loadMgr = scheduler.getLoadManager();
+ if (!loadMgr.canLaunchTask(tts, job, taskType)) {
+ return null;
+ }
if (taskType == TaskType.MAP) {
LocalityLevel localityLevel = scheduler.getAllowedLocalityLevel(
job, currentTime);
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/LoadManager.java Sat Nov 28 20:26:01 2009
@@ -22,6 +22,7 @@
import org.apache.hadoop.conf.Configurable;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.mapreduce.TaskType;
/**
* A pluggable object that manages the load on each {@link TaskTracker}, telling
@@ -66,6 +67,8 @@
/**
* Can a given {@link TaskTracker} run another map task?
+ * This method may check whether the specified tracker has
+ * enough resources to run another map task.
* @param tracker The machine we wish to run a new map on
* @param totalRunnableMaps Set of running jobs in the cluster
* @param totalMapSlots The total number of map slots in the cluster
@@ -76,6 +79,8 @@
/**
* Can a given {@link TaskTracker} run another reduce task?
+ * This method may check whether the specified tracker has
+ * enough resources to run another reduce task.
* @param tracker The machine we wish to run a new map on
* @param totalRunnableReduces Set of running jobs in the cluster
* @param totalReduceSlots The total number of reduce slots in the cluster
@@ -83,4 +88,16 @@
*/
public abstract boolean canAssignReduce(TaskTrackerStatus tracker,
int totalRunnableReduces, int totalReduceSlots);
+
+ /**
+ * Can a given {@link TaskTracker} run another new task from a given job?
+ * This method is provided for use by LoadManagers that take into
+ * account jobs' individual resource needs when placing tasks.
+ * @param tracker The machine we wish to run a new map on
+ * @param job The job from which we want to run a task on this machine
+ * @param type The type of task that we want to run on
+ * @return true if this task can be launched on <code>tracker</code>
+ */
+ public abstract boolean canLaunchTask(TaskTrackerStatus tracker,
+ JobInProgress job, TaskType type);
}
Modified: hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=885145&r1=885144&r2=885145&view=diff
==============================================================================
--- hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/mapreduce/branches/MAPREDUCE-233/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Sat Nov 28 20:26:01 2009
@@ -61,6 +61,8 @@
*/
public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+ public static final String EXPLICIT_POOL_PROPERTY = "mapred.fairscheduler.pool";
+
private final FairScheduler scheduler;
// Map and reduce minimum allocations for each pool
@@ -99,7 +101,7 @@
// used) or a String to specify an absolute path (if
// mapred.fairscheduler.allocation.file is used).
private String poolNameProperty; // Jobconf property to use for determining a
- // job's pool name (default: user.name)
+ // job's pool name (default: mapreduce.job.user.name)
private Map<String, Pool> pools = new HashMap<String, Pool>();
@@ -115,7 +117,7 @@
AllocationConfigurationException, ParserConfigurationException {
Configuration conf = scheduler.getConf();
this.poolNameProperty = conf.get(
- "mapred.fairscheduler.poolnameproperty", "user.name");
+ "mapred.fairscheduler.poolnameproperty", JobContext.USER_NAME);
this.allocFile = conf.get("mapred.fairscheduler.allocation.file");
if (allocFile == null) {
// No allocation file specified in jobconf. Use the default allocation
@@ -391,7 +393,7 @@
*/
public synchronized void setPool(JobInProgress job, String pool) {
removeJob(job);
- job.getJobConf().set(poolNameProperty, pool);
+ job.getJobConf().set(EXPLICIT_POOL_PROPERTY, pool);
addJob(job);
}
@@ -403,13 +405,16 @@
}
/**
- * Get the pool name for a JobInProgress from its configuration. This uses
- * the "project" property in the jobconf by default, or the property set with
- * "mapred.fairscheduler.poolnameproperty".
+ * Get the pool name for a JobInProgress from its configuration. This uses
+ * the value of mapred.fairscheduler.pool if specified, otherwise the value
+ * of the property named in mapred.fairscheduler.poolnameproperty if that is
+ * specified. Otherwise if neither is specified it uses the "user.name" property
+ * in the jobconf by default.
*/
public String getPoolName(JobInProgress job) {
Configuration conf = job.getJobConf();
- return conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME).trim();
+ return conf.get(EXPLICIT_POOL_PROPERTY,
+ conf.get(poolNameProperty, Pool.DEFAULT_POOL_NAME)).trim();
}
/**