You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by to...@apache.org on 2011/09/17 00:20:54 UTC

svn commit: r1171829 - in /hadoop/common/branches/branch-0.20-security: ./ src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/ src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/ src/docs/src/documentation/content/xdocs/ src/mapred/or...

Author: todd
Date: Fri Sep 16 22:20:54 2011
New Revision: 1171829

URL: http://svn.apache.org/viewvc?rev=1171829&view=rev
Log:
MAPREDUCE-2836. Provide option to fail jobs when submitted to non-existent fair scheduler pools. Contributed by Ahmed Radwan.

Added:
    hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java
    hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java
Modified:
    hadoop/common/branches/branch-0.20-security/CHANGES.txt
    hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
    hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
    hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
    hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java

Modified: hadoop/common/branches/branch-0.20-security/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/CHANGES.txt?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/CHANGES.txt (original)
+++ hadoop/common/branches/branch-0.20-security/CHANGES.txt Fri Sep 16 22:20:54 2011
@@ -10,6 +10,9 @@ Release 0.20.206.0 - unreleased
 
   IMPROVEMENTS
 
+    MAPREDUCE-2836. Provide option to fail jobs when submitted to
+    non-existent fair scheduler pools. (Ahmed Radwan via todd)
+
 Release 0.20.205.0 - unreleased
 
   NEW FEATURES

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/FairScheduler.java Fri Sep 16 22:20:54 2011
@@ -42,6 +42,7 @@ import org.apache.hadoop.metrics.Metrics
 import org.apache.hadoop.metrics.MetricsUtil;
 import org.apache.hadoop.metrics.Updater;
 import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.hadoop.util.StringUtils;
 
 /**
  * A {@link TaskScheduler} that implements fair sharing.
@@ -97,7 +98,15 @@ public class FairScheduler extends TaskS
   protected long lastDumpTime;       // Time when we last dumped state to log
   protected long lastHeartbeatTime;  // Time we last ran assignTasks 
   private long lastPreemptCheckTime; // Time we last ran preemptTasksIfNecessary
-  
+
+  /**
+   * A configuration property that controls the ability of submitting jobs to
+   * pools not declared in the scheduler allocation file.
+   */
+  public final static String ALLOW_UNDECLARED_POOLS_KEY =
+    "mapred.fairscheduler.allow.undeclared.pools";
+  private boolean allowUndeclaredPools = false;
+
   /**
    * A class for holding per-job scheduler variables. These always contain the
    * values of the variables at the last update(), and are used along with a
@@ -195,6 +204,7 @@ public class FairScheduler extends TaskS
           "mapred.fairscheduler.locality.delay.node", defaultDelay);
       rackLocalityDelay = conf.getLong(
           "mapred.fairscheduler.locality.delay.rack", defaultDelay);
+      allowUndeclaredPools = conf.getBoolean(ALLOW_UNDECLARED_POOLS_KEY, true);
       if (defaultDelay == -1 && 
           (nodeLocalityDelay == -1 || rackLocalityDelay == -1)) {
         autoComputeLocalityDelay = true; // Compute from heartbeat interval
@@ -1098,4 +1108,24 @@ public class FairScheduler extends TaskS
   long getLastPreemptionUpdateTime() {
     return lastPreemptionUpdateTime;
   }
+
+
+  /**
+   * Examines the job's pool name to determine if it is a declared pool name (in
+   * the scheduler allocation file).
+   */
+  @Override
+  public void checkJobSubmission(JobInProgress job)
+      throws UndeclaredPoolException {
+    Set<String> declaredPools = poolMgr.getDeclaredPools();
+    if (!this.allowUndeclaredPools
+        && !declaredPools.contains(poolMgr.getPoolName(job)))
+      throw new UndeclaredPoolException("Pool name: '"
+          + poolMgr.getPoolName(job)
+          + "' is invalid. Add pool name to the fair scheduler allocation "
+          + "file. Valid pools are: "
+          + StringUtils.join(", ", declaredPools));
+  }
+
+
 }

Modified: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/PoolManager.java Fri Sep 16 22:20:54 2011
@@ -28,6 +28,8 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.TreeSet;
 
 import javax.xml.parsers.DocumentBuilder;
 import javax.xml.parsers.DocumentBuilderFactory;
@@ -113,6 +115,7 @@ public class PoolManager {
   private long lastReloadAttempt; // Last time we tried to reload the pools file
   private long lastSuccessfulReload; // Last time we successfully reloaded pools
   private boolean lastReloadAttemptFailed = false;
+  private Set<String> declaredPools = new TreeSet<String>();
 
   public PoolManager(FairScheduler scheduler) {
     this.scheduler = scheduler;
@@ -370,6 +373,8 @@ public class PoolManager {
       this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
       this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
       this.defaultSchedulingMode = defaultSchedulingMode;
+      this.declaredPools = Collections.unmodifiableSet(new TreeSet<String>(
+          poolNamesInAllocFile));
       for (String name: poolNamesInAllocFile) {
         Pool pool = getPool(name);
         if (poolModes.containsKey(name)) {
@@ -543,4 +548,9 @@ public class PoolManager {
       pool.updateMetrics();
     }
   }
+
+  public synchronized Set<String> getDeclaredPools() {
+    return declaredPools;
+  }
+
 }

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java?rev=1171829&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/java/org/apache/hadoop/mapred/UndeclaredPoolException.java Fri Sep 16 22:20:54 2011
@@ -0,0 +1,32 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.IOException;
+
+/**
+ * Thrown when the pool is not declared in the fair scheduler allocation file.
+ */
+public class UndeclaredPoolException extends IOException {
+
+  private static final long serialVersionUID = -3559057276650280117L;
+
+  public UndeclaredPoolException(String message) {
+    super(message);
+  }
+}

Added: hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java?rev=1171829&view=auto
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java (added)
+++ hadoop/common/branches/branch-0.20-security/src/contrib/fairscheduler/src/test/org/apache/hadoop/mapred/TestFairSchedulerPoolNames.java Fri Sep 16 22:20:54 2011
@@ -0,0 +1,178 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.mapred;
+
+import java.io.DataOutputStream;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.io.PrintWriter;
+import java.net.URI;
+
+import static org.junit.Assert.*;
+
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hdfs.MiniDFSCluster;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+
+public class TestFairSchedulerPoolNames {
+
+  final static String TEST_DIR = new File(System.getProperty("test.build.data",
+      "build/contrib/streaming/test/data")).getAbsolutePath();
+  final static String ALLOC_FILE = new File(TEST_DIR, "test-pools")
+      .getAbsolutePath();
+
+  private static final String POOL_PROPERTY = "pool";
+  private String namenode;
+  private MiniDFSCluster miniDFSCluster = null;
+  private MiniMRCluster miniMRCluster = null;
+
+  /**
+   * Note that The PoolManager.ALLOW_UNDECLARED_POOLS_KEY property is set to
+   * false. So, the default pool is not added, and only pool names in the
+   * scheduler allocation file are considered valid.
+   */
+  @Before
+  public void setUp() throws Exception {
+    new File(TEST_DIR).mkdirs(); // Make sure data directory exists
+    // Create an allocation file with only one pool defined.
+    PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+    out.println("<?xml version=\"1.0\"?>");
+    out.println("<allocations>");
+    out.println("<pool name=\"poolA\">");
+    out.println("<minMaps>1</minMaps>");
+    out.println("<minReduces>1</minReduces>");
+    out.println("</pool>");
+    out.println("</allocations>");
+    out.close();
+
+    namenode = "file:///";
+
+    JobConf clusterConf = new JobConf();
+    clusterConf.set("mapred.jobtracker.taskScheduler", FairScheduler.class
+        .getName());
+    clusterConf.set("mapred.fairscheduler.allocation.file", ALLOC_FILE);
+    clusterConf.set("mapred.fairscheduler.poolnameproperty", POOL_PROPERTY);
+    clusterConf.setBoolean(FairScheduler.ALLOW_UNDECLARED_POOLS_KEY, false);
+    miniMRCluster = new MiniMRCluster(1, namenode, 1, null, null, clusterConf);
+  }
+
+  @After
+  public void tearDown() throws Exception {
+    if (miniDFSCluster != null) {
+      miniDFSCluster.shutdown();
+    }
+    if (miniMRCluster != null) {
+      miniMRCluster.shutdown();
+    }
+  }
+
+  private void submitJob(String pool) throws IOException {
+    JobConf conf = new JobConf();
+    final Path inDir = new Path("/tmp/testing/wc/input");
+    final Path outDir = new Path("/tmp/testing/wc/output");
+    FileSystem fs = FileSystem.get(URI.create(namenode), conf);
+    fs.delete(outDir, true);
+    if (!fs.mkdirs(inDir)) {
+      throw new IOException("Mkdirs failed to create " + inDir.toString());
+    }
+    DataOutputStream file = fs.create(new Path(inDir, "part-00000"));
+    file.writeBytes("Sample text");
+    file.close();
+
+    FileSystem.setDefaultUri(conf, namenode);
+    conf.set("mapred.job.tracker", "localhost:"
+        + miniMRCluster.getJobTrackerPort());
+    conf.setJobName("wordcount");
+    conf.setInputFormat(TextInputFormat.class);
+
+    // the keys are words (strings)
+    conf.setOutputKeyClass(Text.class);
+    // the values are counts (ints)
+    conf.setOutputValueClass(IntWritable.class);
+
+    conf.setMapperClass(WordCount.MapClass.class);
+    conf.setCombinerClass(WordCount.Reduce.class);
+    conf.setReducerClass(WordCount.Reduce.class);
+
+    FileInputFormat.setInputPaths(conf, inDir);
+    FileOutputFormat.setOutputPath(conf, outDir);
+    conf.setNumMapTasks(1);
+    conf.setNumReduceTasks(0);
+
+    if (pool != null) {
+      conf.set(POOL_PROPERTY, pool);
+    }
+    JobClient.runJob(conf);
+  }
+
+  /**
+   * Tests job submission using the default pool name.
+   */
+  @Test
+  public void testDefaultPoolName() {
+    Throwable t = null;
+    try {
+      submitJob(null);
+    } catch (Exception e) {
+      t = e;
+    }
+    assertNotNull("No exception during submission", t);
+    assertTrue("Incorrect exception message", t.getMessage().contains(
+        "Add pool name to the fair scheduler allocation file"));
+  }
+
+  /**
+   * Tests job submission using a valid pool name (i.e., name exists in the fair
+   * scheduler allocation file).
+   */
+  @Test
+  public void testValidPoolName() {
+    Throwable t = null;
+    try {
+      submitJob("poolA");
+    } catch (Exception e) {
+      t = e;
+    }
+    assertNull("Exception during submission", t);
+  }
+
+  /**
+   * Tests job submission using an invalid pool name (i.e., name doesn't exist
+   * in the fair scheduler allocation file).
+   */
+  @Test
+  public void testInvalidPoolName() {
+    Throwable t = null;
+    try {
+      submitJob("poolB");
+    } catch (Exception e) {
+      t = e;
+    }
+    assertNotNull("No exception during submission", t);
+    assertTrue("Incorrect exception message", t.getMessage().contains(
+        "Add pool name to the fair scheduler allocation file"));
+  }
+
+}
\ No newline at end of file

Modified: hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml (original)
+++ hadoop/common/branches/branch-0.20-security/src/docs/src/documentation/content/xdocs/fair_scheduler.xml Fri Sep 16 22:20:54 2011
@@ -192,6 +192,15 @@
           </tr>
           <tr>
           <td>
+            mapred.fairscheduler.allow.undeclared.pools
+          </td>
+          <td>
+            Boolean property for enabling job submission to pools not declared
+            in the allocation file. Default: true.
+          </td>
+          </tr>
+          <tr>
+          <td>
             mapred.fairscheduler.allocation.file
           </td>
           <td>

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/JobTracker.java Fri Sep 16 22:20:54 2011
@@ -3978,7 +3978,14 @@ public class JobTracker implements MRCon
         jobInfo.write(out);
         out.close();
       }
-      
+
+      try {
+        this.taskScheduler.checkJobSubmission(job);
+      } catch (IOException ioe){
+        LOG.error("Problem in submitting job " + jobId, ioe);
+        throw ioe;
+      }
+
       // Submit the job
       JobStatus status;
       try {

Modified: hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java?rev=1171829&r1=1171828&r2=1171829&view=diff
==============================================================================
--- hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java (original)
+++ hadoop/common/branches/branch-0.20-security/src/mapred/org/apache/hadoop/mapred/TaskScheduler.java Fri Sep 16 22:20:54 2011
@@ -96,5 +96,15 @@ abstract class TaskScheduler implements 
    * Refresh the configuration of the scheduler.
    */
   public void refresh() throws IOException {}
-  
+
+
+  /**
+   * Subclasses can override to provide any scheduler-specific checking
+   * mechanism for job submission.
+   * @param job
+   * @throws IOException
+   */
+  public void checkJobSubmission(JobInProgress job) throws IOException{
+  }
+
 }