You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/09/17 00:20:54 UTC
svn commit: r1171829 - in /hadoop/common/branches/branch-0.20-security: ./
src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/
src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/
src/docs/src/documentation/content/xdocs/ src/mapred/or...
Author: todd
Date: Fri Sep 16 22:20:54 2011
New Revision: 1171829
URL: http://svn.apache.org/viewvc?rev=1171829&view=rev
Log:
MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent fair scheduler pools. Contributed by Ahmed Radwan.
Added:
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java
Modified:
hadoop/common/branches/branch-0.20-security/CHANGES.txt
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 16 22:20:54 2011
@@ -10,6 +10,9 @@ Release 0.20.206.0 - unreleased
IMPROVEMENTS
+ MAPREDUCE-2836. Provide option to fail jobs when submitted to
+ non-existent fair scheduler pools. (Ahmed Radwan via todd)
+
Release 0.20.205.0 - unreleased
NEW FEATURES
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Sep 16 22:20:54 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.metrics.Metrics
import org.apache.hadoop.metrics.MetricsUtil;
import org.apache.hadoop.metrics.Updater;
import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
/**
* A {@link TaskScheduler} that implements fair sharing.
@@ -97,7 +98,15 @@ public class FairScheduler extends TaskS
protected long lastDumpTime; // Time when we last dumped state to log
protected long lastHeartbeatTime; // Time we last ran assignTasks
private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
-
+
+ /**
+ * A configuration property that controls the ability of submitting jobs to
+ * pools not declared in the scheduler allocation file.
+ */
+ public final static String ALLOW_UNDECLARED_POOLS_KEY =
+ "mapred.fairscheduler.allow.undeclared.pools";
+ private boolean allowUndeclaredPools = false;
+
/**
* A class for holding per-job scheduler variables. These always contain the
* values of the variables at the last update(), and are used along with a
@@ -195,6 +204,7 @@ public class FairScheduler extends TaskS
"mapred.fairscheduler.locality.delay.node", defaultDelay);
rackLocalityDelay = conf.getLong(
"mapred.fairscheduler.locality.delay.rack", defaultDelay);
+ allowUndeclaredPools = conf.getBoolean(ALLOW_UNDECLARED_POOLS_KEY, true);
if (defaultDelay == -1 &&
(nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
autoComputeLocalityDelay = true; // Compute from heartbeat interval
@@ -1098,4 +1108,24 @@ public class FairScheduler extends TaskS
long getLastPreemptionUpdateTime() {
return lastPreemptionUpdateTime;
}
+
+
+ /**
+ * Examines the job's pool name to determine if it is a declared pool name (in
+ * the scheduler allocation file).
+ */
+ @Override
+ public void checkJobSubmission(JobInProgress job)
+ throws UndeclaredPoolException {
+ Set<String> declaredPools = poolMgr.getDeclaredPools();
+ if (!this.allowUndeclaredPools
+ && !declaredPools.contains(poolMgr.getPoolName(job)))
+ throw new UndeclaredPoolException("Pool name: '"
+ + poolMgr.getPoolName(job)
+ + "' is invalid. Add pool name to the fair scheduler allocation "
+ + "file. Valid pools are: "
+ + StringUtils.join(", ", declaredPools));
+ }
+
+
}
Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Sep 16 22:20:54 2011
@@ -28,6 +28,8 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
@@ -113,6 +115,7 @@ public class PoolManager {
private long lastReloadAttempt; // Last time we tried to reload the pools file
private long lastSuccessfulReload; // Last time we successfully reloaded pools
private boolean lastReloadAttemptFailed = false;
+ private Set<String> declaredPools = new TreeSet<String>();
public PoolManager(FairScheduler scheduler) {
this.scheduler = scheduler;
@@ -370,6 +373,8 @@ public class PoolManager {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.defaultSchedulingMode = defaultSchedulingMode;
+ this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
+ poolNamesInAllocFile));
for (String name: poolNamesInAllocFile) {
Pool pool = getPool(name);
if (poolModes.containsKey(name)) {
@@ -543,4 +548,9 @@ public class PoolManager {
pool.updateMetrics();
}
}
+
+ public synchronized Set<String> getDeclaredPools() {
+ return declaredPools;
+ }
+
}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java?rev=1171829&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java Fri Sep 16 22:20:54 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * Thrown when the pool is not declared in the fair scheduler allocation file.
+ */
+public class UndeclaredPoolException extends IOException {
+
+ private static final long serialVersionUID = -3559057276650280117L;
+
+ public UndeclaredPoolException(String message) {
+ super(message);
+ }
+}
Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java?rev=1171829&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java Fri Sep 16 22:20:54 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFairSchedulerPoolNames {
+
+ final static String TEST_DIR = new File(System.getProperty("test.build.data",
+ "build/contrib/streaming/test/data")).getAbsolutePath();
+ final static String ALLOC_FILE = new File(TEST_DIR, "test-pools")
+ .getAbsolutePath();
+
+ private static final String POOL_PROPERTY = "pool";
+ private String namenode;
+ private MiniDFSCluster miniDFSCluster = null;
+ private MiniMRCluster miniMRCluster = null;
+
+ /**
+ * Note that The PoolManager.ALLOW_UNDECLARED_POOLS_KEY property is set to
+ * false. So, the default pool is not added, and only pool names in the
+ * scheduler allocation file are considered valid.
+ */
+ @Before
+ public void setUp() throws Exception {
+ new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+ // Create an allocation file with only one pool defined.
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<pool name=\"poolA\">");
+ out.println("<minMaps>1</minMaps>");
+ out.println("<minReduces>1</minReduces>");
+ out.println("</pool>");
+ out.println("</allocations>");
+ out.close();
+
+ namenode = "file:///";
+
+ JobConf clusterConf = new JobConf();
+ clusterConf.set("mapred.jobtracker.taskScheduler", FairScheduler.class
+ .getName());
+ clusterConf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
+ clusterConf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
+ clusterConf.setBoolean(FairScheduler.ALLOW_UNDECLARED_POOLS_KEY, false);
+ miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, clusterConf);
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ if (miniDFSCluster != null) {
+ miniDFSCluster.shutdown();
+ }
+ if (miniMRCluster != null) {
+ miniMRCluster.shutdown();
+ }
+ }
+
+ private void submitJob(String pool) throws IOException {
+ JobConf conf = new JobConf();
+ final Path inDir = new Path("/tmp/testing/wc/input");
+ final Path outDir = new Path("/tmp/testing/wc/output");
+ FileSystem fs = FileSystem.get(URI.create(namenode), conf);
+ fs.delete(outDir, true);
+ if (!fs.mkdirs(inDir)) {
+ throw new IOException("Mkdirs failed to create " + inDir.toString());
+ }
+ DataOutputStream file = fs.create(new Path(inDir, "part-00000"));
+ file.writeBytes("Sample text");
+ file.close();
+
+ FileSystem.setDefaultUri(conf, namenode);
+ conf.set("mapred.job.tracker", "localhost:"
+ + miniMRCluster.getJobTrackerPort());
+ conf.setJobName("wordcount");
+ conf.setInputFormat(TextInputFormat.class);
+
+ // the keys are words (strings)
+ conf.setOutputKeyClass(Text.class);
+ // the values are counts (ints)
+ conf.setOutputValueClass(IntWritable.class);
+
+ conf.setMapperClass(WordCount.MapClass.class);
+ conf.setCombinerClass(WordCount.Reduce.class);
+ conf.setReducerClass(WordCount.Reduce.class);
+
+ FileInputFormat.setInputPaths(conf, inDir);
+ FileOutputFormat.setOutputPath(conf, outDir);
+ conf.setNumMapTasks(1);
+ conf.setNumReduceTasks(0);
+
+ if (pool != null) {
+ conf.set(POOL_PROPERTY, pool);
+ }
+ JobClient.runJob(conf);
+ }
+
+ /**
+ * Tests job submission using the default pool name.
+ */
+ @Test
+ public void testDefaultPoolName() {
+ Throwable t = null;
+ try {
+ submitJob(null);
+ } catch (Exception e) {
+ t = e;
+ }
+ assertNotNull("No exception during submission", t);
+ assertTrue("Incorrect exception message", t.getMessage().contains(
+ "Add pool name to the fair scheduler allocation file"));
+ }
+
+ /**
+ * Tests job submission using a valid pool name (i.e., name exists in the fair
+ * scheduler allocation file).
+ */
+ @Test
+ public void testValidPoolName() {
+ Throwable t = null;
+ try {
+ submitJob("poolA");
+ } catch (Exception e) {
+ t = e;
+ }
+ assertNull("Exception during submission", t);
+ }
+
+ /**
+ * Tests job submission using an invalid pool name (i.e., name doesn't exist
+ * in the fair scheduler allocation file).
+ */
+ @Test
+ public void testInvalidPoolName() {
+ Throwable t = null;
+ try {
+ submitJob("poolB");
+ } catch (Exception e) {
+ t = e;
+ }
+ assertNotNull("No exception during submission", t);
+ assertTrue("Incorrect exception message", t.getMessage().contains(
+ "Add pool name to the fair scheduler allocation file"));
+ }
+
+}
\ No newline at end of file
Modified: hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Fri Sep 16 22:20:54 2011
@@ -192,6 +192,15 @@
</tr>
<tr>
<td>
+ mapred.fairscheduler.allow.undeclared.pools
+ </td>
+ <td>
+ Boolean property for enabling job submission to pools not declared
+ in the allocation file. Default: true.
+ </td>
+ </tr>
+ <tr>
+ <td>
mapred.fairscheduler.allocation.file
</td>
<td>
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 16 22:20:54 2011
@@ -3978,7 +3978,14 @@ public class JobTracker implements MRCon
jobInfo.write(out);
out.close();
}
-
+
+ try {
+ this.taskScheduler.checkJobSubmission(job);
+ } catch (IOException ioe){
+ LOG.error("Problem in submitting job " + jobId, ioe);
+ throw ioe;
+ }
+
// Submit the job
JobStatus status;
try {
Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Sep 16 22:20:54 2011
@@ -96,5 +96,15 @@ abstract class TaskScheduler implements
* Refresh the configuration of the scheduler.
*/
public void refresh() throws IOException {}
-
+
+
+ /**
+ * Subclasses can override to provide any scheduler-specific checking
+ * mechanism for job submission.
+ * @param job
+ * @throws IOException
+ */
+ public void checkJobSubmission(JobInProgress job) throws IOException{
+ }
+
}