You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/13 02:43:02 UTC

svn commit: r1361020 [2/3] - in /hadoop/common/trunk/hadoop-mapreduce-project: ./ hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/ hadoop-yarn/hadoop-yarn-server/had...

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,111 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+public class FairSchedulerConfiguration extends Configuration {
+  public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
+
+  private static final String CONF_PREFIX =  "yarn.scheduler.fair.";
+
+  protected static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+  protected static final String EVENT_LOG_DIR = "eventlog.dir";
+
+  /** Whether to use the user name as the queue name (instead of "default") if
+   * the request does not specify a queue. */
+  protected static final String  USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
+  protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
+
+  protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
+  protected static final float  DEFAULT_LOCALITY_THRESHOLD = -1.0f;
+
+  /** Cluster threshold for node locality. */
+  protected static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX + "locality.threshold.node";
+  protected static final float  DEFAULT_LOCALITY_THRESHOLD_NODE =
+		  DEFAULT_LOCALITY_THRESHOLD;
+
+  /** Cluster threshold for rack locality. */
+  protected static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX + "locality.threshold.rack";
+  protected static final float  DEFAULT_LOCALITY_THRESHOLD_RACK =
+		  DEFAULT_LOCALITY_THRESHOLD;
+
+  /** Whether preemption is enabled. */
+  protected static final String  PREEMPTION = CONF_PREFIX + "preemption";
+  protected static final boolean DEFAULT_PREEMPTION = false;
+
+  /** Whether to assign multiple containers in one check-in. */
+  protected static final String  ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
+  protected static final boolean DEFAULT_ASSIGN_MULTIPLE = true;
+
+  /** Whether to give more weight to apps requiring many resources. */
+  protected static final String  SIZE_BASED_WEIGHT = CONF_PREFIX + "sizebasedweight";
+  protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
+
+  /** Maximum number of containers to assign on each check-in. */
+  protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
+  protected static final int DEFAULT_MAX_ASSIGN = -1;
+
+  public FairSchedulerConfiguration(Configuration conf) {
+    super(conf);
+    addResource(FS_CONFIGURATION_FILE);
+  }
+
+  public Resource getMinimumMemoryAllocation() {
+    int mem = getInt(
+        YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+    return Resources.createResource(mem);
+  }
+
+  public Resource getMaximumMemoryAllocation() {
+    int mem = getInt(
+        YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+        YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+    return Resources.createResource(mem);
+  }
+
+  public boolean getUserAsDefaultQueue() {
+    return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
+  }
+
+  public float getLocalityThreshold() {
+    return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
+  }
+
+  public float getLocalityThresholdNode() {
+    return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
+  }
+
+  public float getLocalityThresholdRack() {
+    return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
+  }
+
+  public boolean getPreemptionEnabled() {
+    return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
+  }
+
+  public boolean getAssignMultiple() {
+    return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
+  }
+
+  public int getMaxAssign() {
+    return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
+  }
+
+  public boolean getSizeBasedWeight() {
+    return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
+  }
+
+  public String getAllocationFile() {
+    return get(ALLOCATION_FILE);
+  }
+
+  public String getEventlogDir() {
+    return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
+    		"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Event log used by the fair scheduler for machine-readable debug info.
+ * This class uses a log4j rolling file appender to write the log, but uses
+ * a custom tab-separated event format of the form:
+ * <pre>
+ * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+ * </pre>
+ * Various event types are used by the fair scheduler. The purpose of logging
+ * in this format is to enable tools to parse the history log easily and read
+ * internal scheduler variables, rather than trying to make the log human
+ * readable. The fair scheduler also logs human readable messages in the
+ * JobTracker's main log.
+ *
+ * Constructing this class creates a disabled log. It must be initialized
+ * using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
+ * writing to the file.
+ */
+@Private
+@Unstable
+class FairSchedulerEventLog {
+  private static final Log LOG = LogFactory.getLog(FairSchedulerEventLog.class.getName());
+
+  /** Set to true if logging is disabled due to an error. */
+  private boolean logDisabled = true;
+
+  /**
+   * Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
+   * defaults to {hadoop.log.dir}/fairscheduler.
+   */
+  private String logDir;
+
+  /**
+   * Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.log.
+   * Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
+   */
+  private String logFile;
+
+  /** Log4j appender used to write to the log file */
+  private DailyRollingFileAppender appender;
+
+  boolean init(FairSchedulerConfiguration conf) {
+    try {
+      logDir = conf.getEventlogDir();
+      Path logDirPath = new Path(logDir);
+      FileSystem fs = logDirPath.getFileSystem(conf);
+      if (!fs.exists(logDirPath)) {
+        if (!fs.mkdirs(logDirPath)) {
+          throw new IOException(
+              "Mkdirs failed to create " + logDirPath.toString());
+        }
+      }
+      String username = System.getProperty("user.name");
+      logFile = String.format("%s%shadoop-%s-fairscheduler.log",
+          logDir, File.separator, username);
+      logDisabled = false;
+      PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
+      appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
+      appender.activateOptions();
+      LOG.info("Initialized fair scheduler event log, logging to " + logFile);
+    } catch (IOException e) {
+      LOG.error(
+          "Failed to initialize fair scheduler event log. Disabling it.", e);
+      logDisabled = true;
+    }
+    return !(logDisabled);
+  }
+
+  /**
+   * Log an event, writing a line in the log file of the form
+   * <pre>
+   * DATE    EVENT_TYPE   PARAM_1   PARAM_2   ...
+   * </pre>
+   */
+  synchronized void log(String eventType, Object... params) {
+    try {
+      if (logDisabled)
+        return;
+      StringBuffer buffer = new StringBuffer();
+      buffer.append(eventType);
+      for (Object param: params) {
+        buffer.append("\t");
+        buffer.append(param);
+      }
+      String message = buffer.toString();
+      Logger logger = Logger.getLogger(getClass());
+      appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
+    } catch (Exception e) {
+      LOG.error("Failed to append to fair scheduler event log", e);
+      logDisabled = true;
+    }
+  }
+
+  /**
+   * Flush and close the log.
+   */
+  void shutdown() {
+    try {
+      if (appender != null)
+        appender.close();
+    } catch (Exception e) {}
+    logDisabled = true;
+  }
+
+  boolean isEnabled() {
+    return !logDisabled;
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Order {@link AppSchedulable} objects by priority and then by submit time, as
+ * in the default scheduler in Hadoop.
+ */
+@Private
+@Unstable
+public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
+  private static final long serialVersionUID = 3428835083489547918L;
+
+  public int compare(AppSchedulable a1, AppSchedulable a2) {
+    int res = a1.getPriority().compareTo(a2.getPriority());
+    if (res == 0) {
+      if (a1.getStartTime() < a2.getStartTime()) {
+        res = -1;
+      } else {
+        res = (a1.getStartTime() == a2.getStartTime() ? 0 : 1);
+      }
+    }
+    if (res == 0) {
+      // If there is a tie, break it by app ID to get a deterministic order
+      res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
+    }
+    return res;
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
+ * for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
+ * This can be used to make shorter jobs finish faster, emulating Shortest Job
+ * First scheduling while not starving long jobs.
+ */
+@Private
+@Unstable
+public class NewJobWeightBooster extends Configured implements WeightAdjuster {
+  private static final float DEFAULT_FACTOR = 3;
+  private static final long DEFAULT_DURATION = 5 * 60 * 1000;
+
+  private float factor;
+  private long duration;
+
+  public void setConf(Configuration conf) {
+    if (conf != null) {
+      factor = conf.getFloat("mapred.newjobweightbooster.factor",
+          DEFAULT_FACTOR);
+      duration = conf.getLong("mapred.newjobweightbooster.duration",
+          DEFAULT_DURATION);
+    }
+    super.setConf(conf);
+  }
+
+  public double adjustWeight(AppSchedulable app, double curWeight) {
+    long start = app.getStartTime();
+    long now = System.currentTimeMillis();
+    if (now - start < duration) {
+      return curWeight * factor;
+    } else {
+      return curWeight;
+    }
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * Maintains a list of queues as well as scheduling parameters for each queue,
+ * such as guaranteed share allocations, from the fair scheduler config file.
+ */
+@Private
+@Unstable
+public class QueueManager {
+  public static final Log LOG = LogFactory.getLog(
+    QueueManager.class.getName());
+
+  /** Time to wait between checks of the allocation file */
+  public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
+
+  /**
+   * Time to wait after the allocation has been modified before reloading it
+   * (this is done to prevent loading a file that hasn't been fully written).
+   */
+  public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+
+  private final FairScheduler scheduler;
+
+  // Minimum resource allocation for each queue
+  private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
+  // Maximum amount of resources per queue
+  private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
+  // Sharing weights for each queue
+  private Map<String, Double> queueWeights = new HashMap<String, Double>();
+
+  // Max concurrent running applications for each queue and for each user; in addition,
+  // for users that have no max specified, we use the userMaxJobsDefault.
+  private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
+  private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+  private int userMaxAppsDefault = Integer.MAX_VALUE;
+  private int queueMaxAppsDefault = Integer.MAX_VALUE;
+
+  // ACL's for each queue. Only specifies non-default ACL's from configuration.
+  private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
+      new HashMap<String, Map<QueueACL, AccessControlList>>();
+
+  // Min share preemption timeout for each queue in seconds. If a job in the queue
+  // waits this long without receiving its guaranteed share, it is allowed to
+  // preempt other jobs' tasks.
+  private Map<String, Long> minSharePreemptionTimeouts =
+    new HashMap<String, Long>();
+
+  // Default min share preemption timeout for queues where it is not set
+  // explicitly.
+  private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+
+  // Preemption timeout for jobs below fair share in seconds. If a job remains
+  // below half its fair share for this long, it is allowed to preempt tasks.
+  private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+
+  SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
+  private Object allocFile; // Path to XML file containing allocations. This
+                            // is either a URL to specify a classpath resource
+                            // (if the fair-scheduler.xml on the classpath is
+                            // used) or a String to specify an absolute path (if
+                            // mapred.fairscheduler.allocation.file is used).
+
+  private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+
+  private long lastReloadAttempt; // Last time we tried to reload the queues file
+  private long lastSuccessfulReload; // Last time we successfully reloaded queues
+  private boolean lastReloadAttemptFailed = false;
+
+  public QueueManager(FairScheduler scheduler) {
+    this.scheduler = scheduler;
+  }
+
+  public void initialize() throws IOException, SAXException,
+      AllocationConfigurationException, ParserConfigurationException {
+    FairSchedulerConfiguration conf = scheduler.getConf();
+    this.allocFile = conf.getAllocationFile();
+    if (allocFile == null) {
+      // No allocation file specified in jobconf. Use the default allocation
+      // file, fair-scheduler.xml, looking for it on the classpath.
+      allocFile = new Configuration().getResource("fair-scheduler.xml");
+      if (allocFile == null) {
+        LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
+            + "not found on the classpath, and no other config file is given "
+            + "through mapred.fairscheduler.allocation.file.");
+      }
+    }
+    reloadAllocs();
+    lastSuccessfulReload = scheduler.getClock().getTime();
+    lastReloadAttempt = scheduler.getClock().getTime();
+    // Create the default queue
+    getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+  }
+
+  /**
+   * Get a queue by name, creating it if necessary
+   */
+  public synchronized FSQueue getQueue(String name) {
+    FSQueue queue = queues.get(name);
+    if (queue == null) {
+      queue = new FSQueue(scheduler, name);
+      queue.setSchedulingMode(defaultSchedulingMode);
+      queues.put(name, queue);
+    }
+    return queue;
+  }
+
+  /**
+   * Return whether a queue exists already.
+   */
+  public synchronized boolean exists(String name) {
+    return queues.containsKey(name);
+  }
+
+  /**
+   * Get the queue for a given AppSchedulable.
+   */
+  public FSQueue getQueueForApp(AppSchedulable app) {
+    return this.getQueue(app.getApp().getQueueName());
+  }
+
+  /**
+   * Reload allocations file if it hasn't been loaded in a while
+   */
+  public void reloadAllocsIfNecessary() {
+    long time = scheduler.getClock().getTime();
+    if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
+      lastReloadAttempt = time;
+      if (null == allocFile) {
+        return;
+      }
+      try {
+        // Get last modified time of alloc file depending whether it's a String
+        // (for a path name) or an URL (for a classloader resource)
+        long lastModified;
+        if (allocFile instanceof String) {
+          File file = new File((String) allocFile);
+          lastModified = file.lastModified();
+        } else { // allocFile is an URL
+          URLConnection conn = ((URL) allocFile).openConnection();
+          lastModified = conn.getLastModified();
+        }
+        if (lastModified > lastSuccessfulReload &&
+            time > lastModified + ALLOC_RELOAD_WAIT) {
+          reloadAllocs();
+          lastSuccessfulReload = time;
+          lastReloadAttemptFailed = false;
+        }
+      } catch (Exception e) {
+        // Throwing the error further out here won't help - the RPC thread
+        // will catch it and report it in a loop. Instead, just log it and
+        // hope somebody will notice from the log.
+        // We log the error only on the first failure so we don't fill up the
+        // JobTracker's log with these messages.
+        if (!lastReloadAttemptFailed) {
+          LOG.error("Failed to reload fair scheduler config file - " +
+              "will use existing allocations.", e);
+        }
+        lastReloadAttemptFailed = true;
+      }
+    }
+  }
+
+  /**
+   * Updates the allocation list from the allocation config file. This file is
+   * expected to be in the XML format specified in the design doc.
+   *
+   * @throws IOException if the config file cannot be read.
+   * @throws AllocationConfigurationException if allocations are invalid.
+   * @throws ParserConfigurationException if XML parser is misconfigured.
+   * @throws SAXException if config file is malformed.
+   */
+  public void reloadAllocs() throws IOException, ParserConfigurationException,
+      SAXException, AllocationConfigurationException {
+    if (allocFile == null) return;
+    // Create some temporary hashmaps to hold the new allocs, and we only save
+    // them in our fields if we have parsed the entire allocs file successfully.
+    Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
+    Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
+    Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
+    Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+    Map<String, Double> queueWeights = new HashMap<String, Double>();
+    Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
+    Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
+    Map<String, Map<QueueACL, AccessControlList>> queueAcls =
+        new HashMap<String, Map<QueueACL, AccessControlList>>();
+    int userMaxAppsDefault = Integer.MAX_VALUE;
+    int queueMaxAppsDefault = Integer.MAX_VALUE;
+    SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
+    // Remember all queue names so we can display them on web UI, etc.
+    List<String> queueNamesInAllocFile = new ArrayList<String>();
+
+    // Read and parse the allocations file.
+    DocumentBuilderFactory docBuilderFactory =
+      DocumentBuilderFactory.newInstance();
+    docBuilderFactory.setIgnoringComments(true);
+    DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+    Document doc;
+    if (allocFile instanceof String) {
+      doc = builder.parse(new File((String) allocFile));
+    } else {
+      doc = builder.parse(allocFile.toString());
+    }
+    Element root = doc.getDocumentElement();
+    if (!"allocations".equals(root.getTagName()))
+      throw new AllocationConfigurationException("Bad fair scheduler config " +
+          "file: top-level element not <allocations>");
+    NodeList elements = root.getChildNodes();
+    for (int i = 0; i < elements.getLength(); i++) {
+      Node node = elements.item(i);
+      if (!(node instanceof Element))
+        continue;
+      Element element = (Element)node;
+      if ("queue".equals(element.getTagName()) ||
+    	  "pool".equals(element.getTagName())) {
+        String queueName = element.getAttribute("name");
+        Map<QueueACL, AccessControlList> acls =
+            new HashMap<QueueACL, AccessControlList>();
+        queueNamesInAllocFile.add(queueName);
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("minResources".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            minQueueResources.put(queueName, Resources.createResource(val));
+          } else if ("maxResources".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            maxQueueResources.put(queueName, Resources.createResource(val));
+          } else if ("maxRunningApps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            queueMaxApps.put(queueName, val);
+          } else if ("weight".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            double val = Double.parseDouble(text);
+            queueWeights.put(queueName, val);
+          } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            long val = Long.parseLong(text) * 1000L;
+            minSharePreemptionTimeouts.put(queueName, val);
+          } else if ("schedulingMode".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            queueModes.put(queueName, parseSchedulingMode(text));
+          } else if ("aclSubmitApps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+          } else if ("aclAdministerApps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+          }
+        }
+        queueAcls.put(queueName, acls);
+        if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+            && Resources.lessThan(maxQueueResources.get(queueName),
+                minQueueResources.get(queueName))) {
+          LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+              queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+        }
+      } else if ("user".equals(element.getTagName())) {
+        String userName = element.getAttribute("name");
+        NodeList fields = element.getChildNodes();
+        for (int j = 0; j < fields.getLength(); j++) {
+          Node fieldNode = fields.item(j);
+          if (!(fieldNode instanceof Element))
+            continue;
+          Element field = (Element) fieldNode;
+          if ("maxRunningApps".equals(field.getTagName())) {
+            String text = ((Text)field.getFirstChild()).getData().trim();
+            int val = Integer.parseInt(text);
+            userMaxApps.put(userName, val);
+          }
+        }
+      } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        userMaxAppsDefault = val;
+      } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        fairSharePreemptionTimeout = val;
+      } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        long val = Long.parseLong(text) * 1000L;
+        defaultMinSharePreemptionTimeout = val;
+      } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        int val = Integer.parseInt(text);
+        queueMaxAppsDefault = val;}
+      else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
+        String text = ((Text)element.getFirstChild()).getData().trim();
+        defaultSchedulingMode = parseSchedulingMode(text);
+      } else {
+        LOG.warn("Bad element in allocations file: " + element.getTagName());
+      }
+    }
+
+    // Commit the reload; also create any queue defined in the alloc file
+    // if it does not already exist, so it can be displayed on the web UI.
+    synchronized(this) {
+      this.minQueueResources = minQueueResources;
+      this.maxQueueResources = maxQueueResources;
+      this.queueMaxApps = queueMaxApps;
+      this.userMaxApps = userMaxApps;
+      this.queueWeights = queueWeights;
+      this.userMaxAppsDefault = userMaxAppsDefault;
+      this.queueMaxAppsDefault = queueMaxAppsDefault;
+      this.defaultSchedulingMode = defaultSchedulingMode;
+      this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+      this.queueAcls = queueAcls;
+      for (String name: queueNamesInAllocFile) {
+        FSQueue queue = getQueue(name);
+        if (queueModes.containsKey(name)) {
+          queue.setSchedulingMode(queueModes.get(name));
+        } else {
+          queue.setSchedulingMode(defaultSchedulingMode);
+        }
+      }
+    }
+  }
+
+  private SchedulingMode parseSchedulingMode(String text)
+      throws AllocationConfigurationException {
+    text = text.toLowerCase();
+    if (text.equals("fair")) {
+      return SchedulingMode.FAIR;
+    } else if (text.equals("fifo")) {
+      return SchedulingMode.FIFO;
+    } else {
+      throw new AllocationConfigurationException(
+          "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+    }
+  }
+
+  /**
+   * Get the minimum resource allocation for the given queue.
+   * @return the cap set on this queue, or 0 if not set.
+   */
+  public Resource getMinResources(String queue) {
+    if (minQueueResources.containsKey(queue)) {
+      return minQueueResources.get(queue);
+    } else{
+      return Resources.createResource(0);
+    }
+  }
+
+  /**
+   * Get the maximum resource allocation for the given queue.
+   * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
+   */
+  Resource getMaxResources(String queueName) {
+    if (maxQueueResources.containsKey(queueName)) {
+      return maxQueueResources.get(queueName);
+    } else {
+      return Resources.createResource(Integer.MAX_VALUE);
+    }
+  }
+
+  /**
+   * Add an app in the appropriate queue
+   */
+  public synchronized void addApp(FSSchedulerApp app) {
+    getQueue(app.getQueueName()).addApp(app);
+  }
+
+  /**
+   * Remove an app
+   */
+  public synchronized void removeJob(SchedulerApp app) {
+    getQueue(app.getQueueName()).removeJob(app);
+  }
+
+  /**
+   * Get a collection of all queues
+   */
+  public synchronized Collection<FSQueue> getQueues() {
+    return queues.values();
+  }
+
+
+  /**
+   * Get all queue names that have been seen either in the allocation file or in
+   * a submitted app.
+   */
+  public synchronized Collection<String> getQueueNames() {
+    List<String> list = new ArrayList<String>();
+    for (FSQueue queue: getQueues()) {
+      list.add(queue.getName());
+    }
+    Collections.sort(list);
+    return list;
+  }
+
+  public int getUserMaxApps(String user) {
+    if (userMaxApps.containsKey(user)) {
+      return userMaxApps.get(user);
+    } else {
+      return userMaxAppsDefault;
+    }
+  }
+
+  public int getQueueMaxApps(String queue) {
+    if (queueMaxApps.containsKey(queue)) {
+      return queueMaxApps.get(queue);
+    } else {
+      return queueMaxAppsDefault;
+    }
+  }
+
+  public double getQueueWeight(String queue) {
+    if (queueWeights.containsKey(queue)) {
+      return queueWeights.get(queue);
+    } else {
+      return 1.0;
+    }
+  }
+
+  /**
+  * Get a queue's min share preemption timeout, in milliseconds. This is the
+  * time after which jobs in the queue may kill other queues' tasks if they
+  * are below their min share.
+  */
+  public long getMinSharePreemptionTimeout(String queueName) {
+    if (minSharePreemptionTimeouts.containsKey(queueName)) {
+      return minSharePreemptionTimeouts.get(queueName);
+    }
+    return defaultMinSharePreemptionTimeout;
+  }
+
+  /**
+   * Get the fair share preemption, in milliseconds. This is the time
+   * after which any job may kill other jobs' tasks if it is below half
+   * its fair share.
+   */
+  public long getFairSharePreemptionTimeout() {
+    return fairSharePreemptionTimeout;
+  }
+
+  /**
+   * Get the ACLs associated with this queue. If a given ACL is not explicitly
+   * configured, include the default value for that ACL.
+   */
+  public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
+    HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
+
+    if (queueAcls.containsKey(queue)) {
+      out.putAll(queueAcls.get(queue));
+    }
+    if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
+      out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
+    }
+    if (!out.containsKey(QueueACL.SUBMIT_APPLICATIONS)) {
+      out.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList("*"));
+    }
+    return out;
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a queue. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a queue and across queues. There are
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and QueueSchedulables, which allocate among jobs in their queue.
+ *
+ * Separate sets of Schedulables are used for maps and reduces. Each queue has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ *
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/queue to the scheduler, including:
+ *    - Demand (maximum number of tasks required)
+ *    - Number of currently running tasks
+ *    - Minimum share (for queues)
+ *    - Job/queue weight (for fair sharing)
+ *    - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ *
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ *   jobs and queues, which may be expensive (e.g. jobs must iterate through all
+ *   their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ *   fair share has been set by its parent to let it distribute its share among
+ *   the other Schedulables within it (e.g. for queues that want to perform fair
+ *   sharing among their jobs).
+ */
+@Private
+@Unstable
+abstract class Schedulable {
+  /** Fair share assigned to this Schedulable */
+  private Resource fairShare = Resources.createResource(0);
+
+  /**
+   * Name of job/queue, used for debugging as well as for breaking ties in
+   * scheduling order deterministically.
+   */
+  public abstract String getName();
+
+  /**
+   * Maximum number of resources required by this Schedulable. This is defined as
+   * number of currently utilized resources + number of unlaunched resources (that
+   * are either not yet launched or need to be speculated).
+   */
+  public abstract Resource getDemand();
+
+  /** Get the aggregate amount of resources consumed by the schedulable. */
+  public abstract Resource getResourceUsage();
+
+  /** Minimum Resource share assigned to the schedulable. */
+  public abstract Resource getMinShare();
+
+
+  /** Job/queue weight in fair sharing. */
+  public abstract double getWeight();
+
+  /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
+  public abstract long getStartTime();
+
+ /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
+  public abstract Priority getPriority();
+
+  /** Refresh the Schedulable's demand and those of its children if any. */
+  public abstract void updateDemand();
+
+  /**
+   * Distribute the fair share assigned to this Schedulable among its
+   * children (used in queues where the internal scheduler is fair sharing).
+   */
+  public abstract void redistributeShare();
+
+  /**
+   * Assign a container on this node if possible, and return the amount of
+   * resources assigned. If {@code reserved} is true, it means a reservation
+   * already exists on this node, and the schedulable should fulfill that
+   * reservation if possible.
+   */
+  public abstract Resource assignContainer(SchedulerNode node, boolean reserved);
+
+  /** Assign a fair share to this Schedulable. */
+  public void setFairShare(Resource fairShare) {
+    this.fairShare = fairShare;
+  }
+
+  /** Get the fair share assigned to this Schedulable. */
+  public Resource getFairShare() {
+    return fairShare;
+  }
+
+  /** Convenient toString implementation for debugging. */
+  @Override
+  public String toString() {
+    return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
+        getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+@Private
+@Unstable
+class SchedulingAlgorithms {
+  public static final Log LOG = LogFactory.getLog(
+      SchedulingAlgorithms.class.getName());
+
+  /**
+   * Compare Schedulables in order of priority and then submission time, as in
+   * the default FIFO scheduler in Hadoop.
+   */
+  public static class FifoComparator implements Comparator<Schedulable>, Serializable {
+    private static final long serialVersionUID = -5905036205491177060L;
+
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      int res = s1.getPriority().compareTo(s2.getPriority());
+      if (res == 0) {
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+      }
+      if (res == 0) {
+        // In the rare case where jobs were submitted at the exact same time,
+        // compare them by name (which will be the JobID) to get a deterministic
+        // ordering, so we don't alternately launch tasks from different jobs.
+        res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /**
+   * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+   * below their min share get priority over those whose min share is met.
+   *
+   * Schedulables below their min share are compared by how far below it they
+   * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+   * and job B has 50 out of a min share of 100, then job B is scheduled next,
+   * because B is at 50% of its min share and A is at 80% of its min share.
+   *
+   * Schedulables above their min share are compared by (runningTasks / weight).
+   * If all weights are equal, slots are given to the job with the fewest tasks;
+   * otherwise, jobs with more weight get proportionally more slots.
+   */
+  public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
+    private static final long serialVersionUID = 5564969375856699313L;
+
+    @Override
+    public int compare(Schedulable s1, Schedulable s2) {
+      double minShareRatio1, minShareRatio2;
+      double useToWeightRatio1, useToWeightRatio2;
+      Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
+      Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
+      boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
+      boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
+      Resource one = Resources.createResource(1);
+      minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
+          Resources.max(minShare1, one).getMemory();
+      minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
+          Resources.max(minShare2, one).getMemory();
+      useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
+      useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+      int res = 0;
+      if (s1Needy && !s2Needy)
+        res = -1;
+      else if (s2Needy && !s1Needy)
+        res = 1;
+      else if (s1Needy && s2Needy)
+        res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+      else // Neither schedulable is needy
+        res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
+      if (res == 0) {
+        // Apps are tied in fairness ratio. Break the tie by submit time and job
+        // name to get a deterministic ordering, which is useful for unit tests.
+        res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+        if (res == 0)
+          res = s1.getName().compareTo(s2.getName());
+      }
+      return res;
+    }
+  }
+
+  /**
+   * Number of iterations for the binary search in computeFairShares. This is
+   * equivalent to the number of bits of precision in the output. 25 iterations
+   * gives precision better than 0.1 slots in clusters with one million slots.
+   */
+  private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+  /**
+   * Given a set of Schedulables and a number of slots, compute their weighted
+   * fair shares. The min shares and demands of the Schedulables are assumed to
+   * be set beforehand. We compute the fairest possible allocation of shares
+   * to the Schedulables that respects their min shares and demands.
+   *
+   * To understand what this method does, we must first define what weighted
+   * fair sharing means in the presence of minimum shares and demands. If there
+   * were no minimum shares and every Schedulable had an infinite demand (i.e.
+   * could launch infinitely many tasks), then weighted fair sharing would be
+   * achieved if the ratio of slotsAssigned / weight was equal for each
+   * Schedulable and all slots were assigned. Minimum shares and demands add
+   * two further twists:
+   * - Some Schedulables may not have enough tasks to fill all their share.
+   * - Some Schedulables may have a min share higher than their assigned share.
+   *
+   * To deal with these possibilities, we define an assignment of slots as
+   * being fair if there exists a ratio R such that:
+   * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+   * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+   * - All other Schedulables S are assigned share R * S.weight
+   * - The sum of all the shares is totalSlots.
+   *
+   * We call R the weight-to-slots ratio because it converts a Schedulable's
+   * weight to the number of slots it is assigned.
+   *
+   * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+   * To do this, we use binary search. Given a ratio R, we compute the number
+   * of slots that would be used in total with this ratio (the sum of the shares
+   * computed using the conditions above). If this number of slots is less than
+   * totalSlots, then R is too small and more slots could be assigned. If the
+   * number of slots is more than totalSlots, then R is too large.
+   *
+   * We begin the binary search with a lower bound on R of 0 (which means that
+   * all Schedulables are only given their minShare) and an upper bound computed
+   * to be large enough that too many slots are given (by doubling R until we
+   * either use more than totalSlots slots or we fulfill all jobs' demands).
+   * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+   * of slots used with a given value of R.
+   *
+   * The running time of this algorithm is linear in the number of Schedulables,
+   * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+   * iterations of binary search is a constant (dependent on desired precision).
+   */
+  public static void computeFairShares(
+      Collection<? extends Schedulable> schedulables, Resource totalResources) {
+    // Find an upper bound on R that we can use in our binary search. We start
+    // at R = 1 and double it until we have either used totalSlots slots or we
+    // have met all Schedulables' demands (if total demand < totalSlots).
+    Resource totalDemand = Resources.createResource(0);
+    for (Schedulable sched: schedulables) {
+      Resources.addTo(totalDemand, sched.getDemand());
+    }
+    Resource cap = Resources.min(totalDemand, totalResources);
+    double rMax = 1.0;
+    while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
+      rMax *= 2.0;
+    }
+    // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+    double left = 0;
+    double right = rMax;
+    for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+      double mid = (left + right) / 2.0;
+      if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
+        left = mid;
+      } else {
+        right = mid;
+      }
+    }
+    // Set the fair shares based on the value of R we've converged to
+    for (Schedulable sched: schedulables) {
+      sched.setFairShare(computeShare(sched, right));
+    }
+  }
+
+  /**
+   * Compute the number of slots that would be used given a weight-to-slot
+   * ratio w2sRatio, for use in the computeFairShares algorithm as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static Resource resUsedWithWeightToResRatio(double w2sRatio,
+      Collection<? extends Schedulable> schedulables) {
+    Resource slotsTaken = Resources.createResource(0);
+    for (Schedulable sched: schedulables) {
+      Resource share = computeShare(sched, w2sRatio);
+      Resources.addTo(slotsTaken, share);
+    }
+    return slotsTaken;
+  }
+
+  /**
+   * Compute the resources assigned to a Schedulable given a particular
+   * res-to-slot ratio r2sRatio, for use in computeFairShares as described
+   * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+   */
+  private static Resource computeShare(Schedulable sched, double r2sRatio) {
+    double share = sched.getWeight() * r2sRatio;
+    share = Math.max(share, sched.getMinShare().getMemory());
+    share = Math.min(share, sched.getDemand().getMemory());
+    return Resources.createResource((int) share);
+  }
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Internal scheduling modes for queues.
+ */
+@Private
+@Unstable
+public enum SchedulingMode {
+  FAIR, FIFO
+}

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * A pluggable object for altering the weights of apps in the fair scheduler,
+ * which is used for example by {@link NewJobWeightBooster} to give higher
+ * weight to new jobs so that short jobs finish faster.
+ *
+ * May implement {@link Configurable} to access configuration parameters.
+ */
+@Private
+@Unstable
+public interface WeightAdjuster {
+  public double adjustWeight(AppSchedulable app, double curWeight);
+}

Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java?rev=1361020&r1=1361019&r2=1361020&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java Fri Jul 13 00:43:01 2012
@@ -18,14 +18,17 @@
 
 package org.apache.hadoop.yarn.server.resourcemanager.webapp;
 
-import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.QUEUE_NAME;
 import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.QUEUE_NAME;
+
 import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
 import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
 import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
 import org.apache.hadoop.yarn.util.StringHelper;
 import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.WebAppException;
 import org.apache.hadoop.yarn.webapp.YarnWebParams;
 
 import com.google.inject.Inject;
@@ -71,6 +74,12 @@ public class RmController extends Contro
       render(CapacitySchedulerPage.class);
       return;
     }
+    
+    if (rs instanceof FairScheduler) {
+      context().setStatus(404);
+      throw new WebAppException("Fair Scheduler UI not yet supported");
+    }
+    
     setTitle("Default Scheduler");
     render(DefaultSchedulerPage.class);
   }

Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFSSchedulerApp {
+  private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+  private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+    ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+    ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
+    appIdImpl.setId(appId);
+    attId.setAttemptId(attemptId);
+    attId.setApplicationId(appIdImpl);
+    return attId;
+  }
+
+  @Test
+  public void testDelayScheduling() {
+    Queue queue = Mockito.mock(Queue.class);
+    Priority prio = Mockito.mock(Priority.class);
+    Mockito.when(prio.getPriority()).thenReturn(1);
+    double nodeLocalityThreshold = .5;
+    double rackLocalityThreshold = .6;
+
+    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+    FSSchedulerApp schedulerApp =
+        new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
+
+    // Default level should be node-local
+    assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+    // First five scheduling opportunities should remain node local
+    for (int i = 0; i < 5; i++) {
+      schedulerApp.addSchedulingOpportunity(prio);
+      assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+          prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+    }
+
+    // After five it should switch to rack local
+    schedulerApp.addSchedulingOpportunity(prio);
+    assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+    // Manually set back to node local
+    schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
+    schedulerApp.resetSchedulingOpportunities(prio);
+    assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+    // Now escalate again to rack-local, then to off-switch
+    for (int i = 0; i < 5; i++) {
+      schedulerApp.addSchedulingOpportunity(prio);
+      assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+          prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+    }
+
+    schedulerApp.addSchedulingOpportunity(prio);
+    assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+    for (int i = 0; i < 6; i++) {
+      schedulerApp.addSchedulingOpportunity(prio);
+      assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+          prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+    }
+
+    schedulerApp.addSchedulingOpportunity(prio);
+    assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+  }
+
+  @Test
+  /**
+   * Ensure that when negative paramaters are given (signaling delay scheduling
+   * no tin use), the least restrictive locality level is returned.
+   */
+  public void testLocalityLevelWithoutDelays() {
+    Queue queue = Mockito.mock(Queue.class);
+    Priority prio = Mockito.mock(Priority.class);
+    Mockito.when(prio.getPriority()).thenReturn(1);
+
+    ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+    FSSchedulerApp schedulerApp =
+        new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
+    assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
+        prio, 10, -1.0, -1.0));
+  }
+}