You are viewing a plain text version of this content. The canonical link for it is here.
Posted to mapreduce-commits@hadoop.apache.org by tu...@apache.org on 2012/07/13 02:43:02 UTC
svn commit: r1361020 [2/3] - in
/hadoop/common/trunk/hadoop-mapreduce-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/resource/
hadoop-yarn/hadoop-yarn-server/had...
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,111 @@
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+public class FairSchedulerConfiguration extends Configuration {
+ public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
+
+ private static final String CONF_PREFIX = "yarn.scheduler.fair.";
+
+ protected static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
+ protected static final String EVENT_LOG_DIR = "eventlog.dir";
+
+ /** Whether to use the user name as the queue name (instead of "default") if
+ * the request does not specify a queue. */
+ protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
+ protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = false;
+
+ protected static final String LOCALITY_THRESHOLD = CONF_PREFIX + "locality.threshold";
+ protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
+
+ /** Cluster threshold for node locality. */
+ protected static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX + "locality.threshold.node";
+ protected static final float DEFAULT_LOCALITY_THRESHOLD_NODE =
+ DEFAULT_LOCALITY_THRESHOLD;
+
+ /** Cluster threshold for rack locality. */
+ protected static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX + "locality.threshold.rack";
+ protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
+ DEFAULT_LOCALITY_THRESHOLD;
+
+ /** Whether preemption is enabled. */
+ protected static final String PREEMPTION = CONF_PREFIX + "preemption";
+ protected static final boolean DEFAULT_PREEMPTION = false;
+
+ /** Whether to assign multiple containers in one check-in. */
+ protected static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
+ protected static final boolean DEFAULT_ASSIGN_MULTIPLE = true;
+
+ /** Whether to give more weight to apps requiring many resources. */
+ protected static final String SIZE_BASED_WEIGHT = CONF_PREFIX + "sizebasedweight";
+ protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
+
+ /** Maximum number of containers to assign on each check-in. */
+ protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
+ protected static final int DEFAULT_MAX_ASSIGN = -1;
+
+ public FairSchedulerConfiguration(Configuration conf) {
+ super(conf);
+ addResource(FS_CONFIGURATION_FILE);
+ }
+
+ public Resource getMinimumMemoryAllocation() {
+ int mem = getInt(
+ YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB);
+ return Resources.createResource(mem);
+ }
+
+ public Resource getMaximumMemoryAllocation() {
+ int mem = getInt(
+ YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+ YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
+ return Resources.createResource(mem);
+ }
+
+ public boolean getUserAsDefaultQueue() {
+ return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
+ }
+
+ public float getLocalityThreshold() {
+ return getFloat(LOCALITY_THRESHOLD, DEFAULT_LOCALITY_THRESHOLD);
+ }
+
+ public float getLocalityThresholdNode() {
+ return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
+ }
+
+ public float getLocalityThresholdRack() {
+ return getFloat(LOCALITY_THRESHOLD_RACK, DEFAULT_LOCALITY_THRESHOLD_RACK);
+ }
+
+ public boolean getPreemptionEnabled() {
+ return getBoolean(PREEMPTION, DEFAULT_PREEMPTION);
+ }
+
+ public boolean getAssignMultiple() {
+ return getBoolean(ASSIGN_MULTIPLE, DEFAULT_ASSIGN_MULTIPLE);
+ }
+
+ public int getMaxAssign() {
+ return getInt(MAX_ASSIGN, DEFAULT_MAX_ASSIGN);
+ }
+
+ public boolean getSizeBasedWeight() {
+ return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
+ }
+
+ public String getAllocationFile() {
+ return get(ALLOCATION_FILE);
+ }
+
+ public String getEventlogDir() {
+ return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
+ "/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerEventLog.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,143 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.DailyRollingFileAppender;
+import org.apache.log4j.Level;
+import org.apache.log4j.Logger;
+import org.apache.log4j.PatternLayout;
+import org.apache.log4j.spi.LoggingEvent;
+
+/**
+ * Event log used by the fair scheduler for machine-readable debug info.
+ * This class uses a log4j rolling file appender to write the log, but uses
+ * a custom tab-separated event format of the form:
+ * <pre>
+ * DATE EVENT_TYPE PARAM_1 PARAM_2 ...
+ * </pre>
+ * Various event types are used by the fair scheduler. The purpose of logging
+ * in this format is to enable tools to parse the history log easily and read
+ * internal scheduler variables, rather than trying to make the log human
+ * readable. The fair scheduler also logs human readable messages in the
+ * JobTracker's main log.
+ *
+ * Constructing this class creates a disabled log. It must be initialized
+ * using {@link FairSchedulerEventLog#init(Configuration, String)} to begin
+ * writing to the file.
+ */
+@Private
+@Unstable
+class FairSchedulerEventLog {
+ private static final Log LOG = LogFactory.getLog(FairSchedulerEventLog.class.getName());
+
+ /** Set to true if logging is disabled due to an error. */
+ private boolean logDisabled = true;
+
+ /**
+ * Log directory, set by mapred.fairscheduler.eventlog.location in conf file;
+ * defaults to {hadoop.log.dir}/fairscheduler.
+ */
+ private String logDir;
+
+ /**
+ * Active log file, which is {LOG_DIR}/hadoop-{user}-fairscheduler.log.
+ * Older files are also stored as {LOG_FILE}.date (date format YYYY-MM-DD).
+ */
+ private String logFile;
+
+ /** Log4j appender used to write to the log file */
+ private DailyRollingFileAppender appender;
+
+ boolean init(FairSchedulerConfiguration conf) {
+ try {
+ logDir = conf.getEventlogDir();
+ Path logDirPath = new Path(logDir);
+ FileSystem fs = logDirPath.getFileSystem(conf);
+ if (!fs.exists(logDirPath)) {
+ if (!fs.mkdirs(logDirPath)) {
+ throw new IOException(
+ "Mkdirs failed to create " + logDirPath.toString());
+ }
+ }
+ String username = System.getProperty("user.name");
+ logFile = String.format("%s%shadoop-%s-fairscheduler.log",
+ logDir, File.separator, username);
+ logDisabled = false;
+ PatternLayout layout = new PatternLayout("%d{ISO8601}\t%m%n");
+ appender = new DailyRollingFileAppender(layout, logFile, "'.'yyyy-MM-dd");
+ appender.activateOptions();
+ LOG.info("Initialized fair scheduler event log, logging to " + logFile);
+ } catch (IOException e) {
+ LOG.error(
+ "Failed to initialize fair scheduler event log. Disabling it.", e);
+ logDisabled = true;
+ }
+ return !(logDisabled);
+ }
+
+ /**
+ * Log an event, writing a line in the log file of the form
+ * <pre>
+ * DATE EVENT_TYPE PARAM_1 PARAM_2 ...
+ * </pre>
+ */
+ synchronized void log(String eventType, Object... params) {
+ try {
+ if (logDisabled)
+ return;
+ StringBuffer buffer = new StringBuffer();
+ buffer.append(eventType);
+ for (Object param: params) {
+ buffer.append("\t");
+ buffer.append(param);
+ }
+ String message = buffer.toString();
+ Logger logger = Logger.getLogger(getClass());
+ appender.append(new LoggingEvent("", logger, Level.INFO, message, null));
+ } catch (Exception e) {
+ LOG.error("Failed to append to fair scheduler event log", e);
+ logDisabled = true;
+ }
+ }
+
+ /**
+ * Flush and close the log.
+ */
+ void shutdown() {
+ try {
+ if (appender != null)
+ appender.close();
+ } catch (Exception e) {}
+ logDisabled = true;
+ }
+
+ boolean isEnabled() {
+ return !logDisabled;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FifoAppComparator.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.Serializable;
+import java.util.Comparator;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Order {@link AppSchedulable} objects by priority and then by submit time, as
+ * in the default scheduler in Hadoop.
+ */
+@Private
+@Unstable
+public class FifoAppComparator implements Comparator<AppSchedulable>, Serializable {
+ private static final long serialVersionUID = 3428835083489547918L;
+
+ public int compare(AppSchedulable a1, AppSchedulable a2) {
+ int res = a1.getPriority().compareTo(a2.getPriority());
+ if (res == 0) {
+ if (a1.getStartTime() < a2.getStartTime()) {
+ res = -1;
+ } else {
+ res = (a1.getStartTime() == a2.getStartTime() ? 0 : 1);
+ }
+ }
+ if (res == 0) {
+ // If there is a tie, break it by app ID to get a deterministic order
+ res = a1.getApp().getApplicationId().compareTo(a2.getApp().getApplicationId());
+ }
+ return res;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/NewJobWeightBooster.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.conf.Configured;
+
+/**
+ * A {@link WeightAdjuster} implementation that gives a weight boost to new jobs
+ * for a certain amount of time -- by default, a 3x weight boost for 60 seconds.
+ * This can be used to make shorter jobs finish faster, emulating Shortest Job
+ * First scheduling while not starving long jobs.
+ */
+@Private
+@Unstable
+public class NewJobWeightBooster extends Configured implements WeightAdjuster {
+ private static final float DEFAULT_FACTOR = 3;
+ private static final long DEFAULT_DURATION = 5 * 60 * 1000;
+
+ private float factor;
+ private long duration;
+
+ public void setConf(Configuration conf) {
+ if (conf != null) {
+ factor = conf.getFloat("mapred.newjobweightbooster.factor",
+ DEFAULT_FACTOR);
+ duration = conf.getLong("mapred.newjobweightbooster.duration",
+ DEFAULT_DURATION);
+ }
+ super.setConf(conf);
+ }
+
+ public double adjustWeight(AppSchedulable app, double curWeight) {
+ long start = app.getStartTime();
+ long now = System.currentTimeMillis();
+ if (now - start < duration) {
+ return curWeight * factor;
+ } else {
+ return curWeight;
+ }
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,513 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.net.URLConnection;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+/**
+ * Maintains a list of queues as well as scheduling parameters for each queue,
+ * such as guaranteed share allocations, from the fair scheduler config file.
+ */
+@Private
+@Unstable
+public class QueueManager {
+ public static final Log LOG = LogFactory.getLog(
+ QueueManager.class.getName());
+
+ /** Time to wait between checks of the allocation file */
+ public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
+
+ /**
+ * Time to wait after the allocation has been modified before reloading it
+ * (this is done to prevent loading a file that hasn't been fully written).
+ */
+ public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
+
+ private final FairScheduler scheduler;
+
+ // Minimum resource allocation for each queue
+ private Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
+ // Maximum amount of resources per queue
+ private Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
+ // Sharing weights for each queue
+ private Map<String, Double> queueWeights = new HashMap<String, Double>();
+
+ // Max concurrent running applications for each queue and for each user; in addition,
+ // for users that have no max specified, we use the userMaxJobsDefault.
+ private Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
+ private Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ private int userMaxAppsDefault = Integer.MAX_VALUE;
+ private int queueMaxAppsDefault = Integer.MAX_VALUE;
+
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
+ private Map<String, Map<QueueACL, AccessControlList>> queueAcls =
+ new HashMap<String, Map<QueueACL, AccessControlList>>();
+
+ // Min share preemption timeout for each queue in seconds. If a job in the queue
+ // waits this long without receiving its guaranteed share, it is allowed to
+ // preempt other jobs' tasks.
+ private Map<String, Long> minSharePreemptionTimeouts =
+ new HashMap<String, Long>();
+
+ // Default min share preemption timeout for queues where it is not set
+ // explicitly.
+ private long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
+ // below half its fair share for this long, it is allowed to preempt tasks.
+ private long fairSharePreemptionTimeout = Long.MAX_VALUE;
+
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
+ private Object allocFile; // Path to XML file containing allocations. This
+ // is either a URL to specify a classpath resource
+ // (if the fair-scheduler.xml on the classpath is
+ // used) or a String to specify an absolute path (if
+ // mapred.fairscheduler.allocation.file is used).
+
+ private Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
+
+ private long lastReloadAttempt; // Last time we tried to reload the queues file
+ private long lastSuccessfulReload; // Last time we successfully reloaded queues
+ private boolean lastReloadAttemptFailed = false;
+
+ public QueueManager(FairScheduler scheduler) {
+ this.scheduler = scheduler;
+ }
+
+ public void initialize() throws IOException, SAXException,
+ AllocationConfigurationException, ParserConfigurationException {
+ FairSchedulerConfiguration conf = scheduler.getConf();
+ this.allocFile = conf.getAllocationFile();
+ if (allocFile == null) {
+ // No allocation file specified in jobconf. Use the default allocation
+ // file, fair-scheduler.xml, looking for it on the classpath.
+ allocFile = new Configuration().getResource("fair-scheduler.xml");
+ if (allocFile == null) {
+ LOG.error("The fair scheduler allocation file fair-scheduler.xml was "
+ + "not found on the classpath, and no other config file is given "
+ + "through mapred.fairscheduler.allocation.file.");
+ }
+ }
+ reloadAllocs();
+ lastSuccessfulReload = scheduler.getClock().getTime();
+ lastReloadAttempt = scheduler.getClock().getTime();
+ // Create the default queue
+ getQueue(YarnConfiguration.DEFAULT_QUEUE_NAME);
+ }
+
+ /**
+ * Get a queue by name, creating it if necessary
+ */
+ public synchronized FSQueue getQueue(String name) {
+ FSQueue queue = queues.get(name);
+ if (queue == null) {
+ queue = new FSQueue(scheduler, name);
+ queue.setSchedulingMode(defaultSchedulingMode);
+ queues.put(name, queue);
+ }
+ return queue;
+ }
+
+ /**
+ * Return whether a queue exists already.
+ */
+ public synchronized boolean exists(String name) {
+ return queues.containsKey(name);
+ }
+
+ /**
+ * Get the queue for a given AppSchedulable.
+ */
+ public FSQueue getQueueForApp(AppSchedulable app) {
+ return this.getQueue(app.getApp().getQueueName());
+ }
+
+ /**
+ * Reload allocations file if it hasn't been loaded in a while
+ */
+ public void reloadAllocsIfNecessary() {
+ long time = scheduler.getClock().getTime();
+ if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
+ lastReloadAttempt = time;
+ if (null == allocFile) {
+ return;
+ }
+ try {
+ // Get last modified time of alloc file depending whether it's a String
+ // (for a path name) or an URL (for a classloader resource)
+ long lastModified;
+ if (allocFile instanceof String) {
+ File file = new File((String) allocFile);
+ lastModified = file.lastModified();
+ } else { // allocFile is an URL
+ URLConnection conn = ((URL) allocFile).openConnection();
+ lastModified = conn.getLastModified();
+ }
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT) {
+ reloadAllocs();
+ lastSuccessfulReload = time;
+ lastReloadAttemptFailed = false;
+ }
+ } catch (Exception e) {
+ // Throwing the error further out here won't help - the RPC thread
+ // will catch it and report it in a loop. Instead, just log it and
+ // hope somebody will notice from the log.
+ // We log the error only on the first failure so we don't fill up the
+ // JobTracker's log with these messages.
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", e);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ }
+ }
+
+ /**
+ * Updates the allocation list from the allocation config file. This file is
+ * expected to be in the XML format specified in the design doc.
+ *
+ * @throws IOException if the config file cannot be read.
+ * @throws AllocationConfigurationException if allocations are invalid.
+ * @throws ParserConfigurationException if XML parser is misconfigured.
+ * @throws SAXException if config file is malformed.
+ */
+ public void reloadAllocs() throws IOException, ParserConfigurationException,
+ SAXException, AllocationConfigurationException {
+ if (allocFile == null) return;
+ // Create some temporary hashmaps to hold the new allocs, and we only save
+ // them in our fields if we have parsed the entire allocs file successfully.
+ Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
+ Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
+ Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
+ Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ Map<String, Double> queueWeights = new HashMap<String, Double>();
+ Map<String, SchedulingMode> queueModes = new HashMap<String, SchedulingMode>();
+ Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls =
+ new HashMap<String, Map<QueueACL, AccessControlList>>();
+ int userMaxAppsDefault = Integer.MAX_VALUE;
+ int queueMaxAppsDefault = Integer.MAX_VALUE;
+ SchedulingMode defaultSchedulingMode = SchedulingMode.FAIR;
+
+ // Remember all queue names so we can display them on web UI, etc.
+ List<String> queueNamesInAllocFile = new ArrayList<String>();
+
+ // Read and parse the allocations file.
+ DocumentBuilderFactory docBuilderFactory =
+ DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc;
+ if (allocFile instanceof String) {
+ doc = builder.parse(new File((String) allocFile));
+ } else {
+ doc = builder.parse(allocFile.toString());
+ }
+ Element root = doc.getDocumentElement();
+ if (!"allocations".equals(root.getTagName()))
+ throw new AllocationConfigurationException("Bad fair scheduler config " +
+ "file: top-level element not <allocations>");
+ NodeList elements = root.getChildNodes();
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (!(node instanceof Element))
+ continue;
+ Element element = (Element)node;
+ if ("queue".equals(element.getTagName()) ||
+ "pool".equals(element.getTagName())) {
+ String queueName = element.getAttribute("name");
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ queueNamesInAllocFile.add(queueName);
+ NodeList fields = element.getChildNodes();
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ minQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ maxQueueResources.put(queueName, Resources.createResource(val));
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, val);
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ queueModes.put(queueName, parseSchedulingMode(text));
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ }
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+ && Resources.lessThan(maxQueueResources.get(queueName),
+ minQueueResources.get(queueName))) {
+ LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+ queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+ }
+ } else if ("user".equals(element.getTagName())) {
+ String userName = element.getAttribute("name");
+ NodeList fields = element.getChildNodes();
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxApps.put(userName, val);
+ }
+ }
+ } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxAppsDefault = val;
+ } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ fairSharePreemptionTimeout = val;
+ } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultMinSharePreemptionTimeout = val;
+ } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxAppsDefault = val;}
+ else if ("defaultQueueSchedulingMode".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ defaultSchedulingMode = parseSchedulingMode(text);
+ } else {
+ LOG.warn("Bad element in allocations file: " + element.getTagName());
+ }
+ }
+
+ // Commit the reload; also create any queue defined in the alloc file
+ // if it does not already exist, so it can be displayed on the web UI.
+ synchronized(this) {
+ this.minQueueResources = minQueueResources;
+ this.maxQueueResources = maxQueueResources;
+ this.queueMaxApps = queueMaxApps;
+ this.userMaxApps = userMaxApps;
+ this.queueWeights = queueWeights;
+ this.userMaxAppsDefault = userMaxAppsDefault;
+ this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.defaultSchedulingMode = defaultSchedulingMode;
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+ this.queueAcls = queueAcls;
+ for (String name: queueNamesInAllocFile) {
+ FSQueue queue = getQueue(name);
+ if (queueModes.containsKey(name)) {
+ queue.setSchedulingMode(queueModes.get(name));
+ } else {
+ queue.setSchedulingMode(defaultSchedulingMode);
+ }
+ }
+ }
+ }
+
+ private SchedulingMode parseSchedulingMode(String text)
+ throws AllocationConfigurationException {
+ text = text.toLowerCase();
+ if (text.equals("fair")) {
+ return SchedulingMode.FAIR;
+ } else if (text.equals("fifo")) {
+ return SchedulingMode.FIFO;
+ } else {
+ throw new AllocationConfigurationException(
+ "Unknown scheduling mode : " + text + "; expected 'fifo' or 'fair'");
+ }
+ }
+
+ /**
+ * Get the minimum resource allocation for the given queue.
+ * @return the cap set on this queue, or 0 if not set.
+ */
+ public Resource getMinResources(String queue) {
+ if (minQueueResources.containsKey(queue)) {
+ return minQueueResources.get(queue);
+ } else{
+ return Resources.createResource(0);
+ }
+ }
+
+ /**
+ * Get the maximum resource allocation for the given queue.
+ * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
+ */
+ Resource getMaxResources(String queueName) {
+ if (maxQueueResources.containsKey(queueName)) {
+ return maxQueueResources.get(queueName);
+ } else {
+ return Resources.createResource(Integer.MAX_VALUE);
+ }
+ }
+
+ /**
+ * Add an app in the appropriate queue
+ */
+ public synchronized void addApp(FSSchedulerApp app) {
+ getQueue(app.getQueueName()).addApp(app);
+ }
+
+ /**
+ * Remove an app
+ */
+ public synchronized void removeJob(SchedulerApp app) {
+ getQueue(app.getQueueName()).removeJob(app);
+ }
+
+ /**
+ * Get a collection of all queues
+ */
+ public synchronized Collection<FSQueue> getQueues() {
+ return queues.values();
+ }
+
+
+ /**
+ * Get all queue names that have been seen either in the allocation file or in
+ * a submitted app.
+ */
+ public synchronized Collection<String> getQueueNames() {
+ List<String> list = new ArrayList<String>();
+ for (FSQueue queue: getQueues()) {
+ list.add(queue.getName());
+ }
+ Collections.sort(list);
+ return list;
+ }
+
+ public int getUserMaxApps(String user) {
+ if (userMaxApps.containsKey(user)) {
+ return userMaxApps.get(user);
+ } else {
+ return userMaxAppsDefault;
+ }
+ }
+
+ public int getQueueMaxApps(String queue) {
+ if (queueMaxApps.containsKey(queue)) {
+ return queueMaxApps.get(queue);
+ } else {
+ return queueMaxAppsDefault;
+ }
+ }
+
+ public double getQueueWeight(String queue) {
+ if (queueWeights.containsKey(queue)) {
+ return queueWeights.get(queue);
+ } else {
+ return 1.0;
+ }
+ }
+
+ /**
+ * Get a queue's min share preemption timeout, in milliseconds. This is the
+ * time after which jobs in the queue may kill other queues' tasks if they
+ * are below their min share.
+ */
+ public long getMinSharePreemptionTimeout(String queueName) {
+ if (minSharePreemptionTimeouts.containsKey(queueName)) {
+ return minSharePreemptionTimeouts.get(queueName);
+ }
+ return defaultMinSharePreemptionTimeout;
+ }
+
+ /**
+ * Get the fair share preemption, in milliseconds. This is the time
+ * after which any job may kill other jobs' tasks if it is below half
+ * its fair share.
+ */
+ public long getFairSharePreemptionTimeout() {
+ return fairSharePreemptionTimeout;
+ }
+
+ /**
+ * Get the ACLs associated with this queue. If a given ACL is not explicitly
+ * configured, include the default value for that ACL.
+ */
+ public Map<QueueACL, AccessControlList> getQueueAcls(String queue) {
+ HashMap<QueueACL, AccessControlList> out = new HashMap<QueueACL, AccessControlList>();
+
+ if (queueAcls.containsKey(queue)) {
+ out.putAll(queueAcls.get(queue));
+ }
+ if (!out.containsKey(QueueACL.ADMINISTER_QUEUE)) {
+ out.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList("*"));
+ }
+ if (!out.containsKey(QueueACL.SUBMIT_APPLICATIONS)) {
+ out.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList("*"));
+ }
+ return out;
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/Schedulable.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
+
+/**
+ * A Schedulable represents an entity that can launch tasks, such as a job
+ * or a queue. It provides a common interface so that algorithms such as fair
+ * sharing can be applied both within a queue and across queues. There are
+ * currently two types of Schedulables: JobSchedulables, which represent a
+ * single job, and QueueSchedulables, which allocate among jobs in their queue.
+ *
+ * Separate sets of Schedulables are used for maps and reduces. Each queue has
+ * both a mapSchedulable and a reduceSchedulable, and so does each job.
+ *
+ * A Schedulable is responsible for three roles:
+ * 1) It can launch tasks through assignTask().
+ * 2) It provides information about the job/queue to the scheduler, including:
+ * - Demand (maximum number of tasks required)
+ * - Number of currently running tasks
+ * - Minimum share (for queues)
+ * - Job/queue weight (for fair sharing)
+ * - Start time and priority (for FIFO)
+ * 3) It can be assigned a fair share, for use with fair scheduling.
+ *
+ * Schedulable also contains two methods for performing scheduling computations:
+ * - updateDemand() is called periodically to compute the demand of the various
+ * jobs and queues, which may be expensive (e.g. jobs must iterate through all
+ * their tasks to count failed tasks, tasks that can be speculated, etc).
+ * - redistributeShare() is called after demands are updated and a Schedulable's
+ * fair share has been set by its parent to let it distribute its share among
+ * the other Schedulables within it (e.g. for queues that want to perform fair
+ * sharing among their jobs).
+ */
+@Private
+@Unstable
+abstract class Schedulable {
+ /** Fair share assigned to this Schedulable */
+ private Resource fairShare = Resources.createResource(0);
+
+ /**
+ * Name of job/queue, used for debugging as well as for breaking ties in
+ * scheduling order deterministically.
+ */
+ public abstract String getName();
+
+ /**
+ * Maximum number of resources required by this Schedulable. This is defined as
+ * number of currently utilized resources + number of unlaunched resources (that
+ * are either not yet launched or need to be speculated).
+ */
+ public abstract Resource getDemand();
+
+ /** Get the aggregate amount of resources consumed by the schedulable. */
+ public abstract Resource getResourceUsage();
+
+ /** Minimum Resource share assigned to the schedulable. */
+ public abstract Resource getMinShare();
+
+
+ /** Job/queue weight in fair sharing. */
+ public abstract double getWeight();
+
+ /** Start time for jobs in FIFO queues; meaningless for QueueSchedulables.*/
+ public abstract long getStartTime();
+
+ /** Job priority for jobs in FIFO queues; meaningless for QueueSchedulables. */
+ public abstract Priority getPriority();
+
+ /** Refresh the Schedulable's demand and those of its children if any. */
+ public abstract void updateDemand();
+
+ /**
+ * Distribute the fair share assigned to this Schedulable among its
+ * children (used in queues where the internal scheduler is fair sharing).
+ */
+ public abstract void redistributeShare();
+
+ /**
+ * Assign a container on this node if possible, and return the amount of
+ * resources assigned. If {@code reserved} is true, it means a reservation
+ * already exists on this node, and the schedulable should fulfill that
+ * reservation if possible.
+ */
+ public abstract Resource assignContainer(SchedulerNode node, boolean reserved);
+
+ /** Assign a fair share to this Schedulable. */
+ public void setFairShare(Resource fairShare) {
+ this.fairShare = fairShare;
+ }
+
+ /** Get the fair share assigned to this Schedulable. */
+ public Resource getFairShare() {
+ return fairShare;
+ }
+
+ /** Convenient toString implementation for debugging. */
+ @Override
+ public String toString() {
+ return String.format("[%s, demand=%s, running=%s, share=%s,], w=%.1f]",
+ getName(), getDemand(), getResourceUsage(), fairShare, getWeight());
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingAlgorithms.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,223 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Comparator;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
+
+/**
+ * Utility class containing scheduling algorithms used in the fair scheduler.
+ */
+@Private
+@Unstable
+class SchedulingAlgorithms {
+ public static final Log LOG = LogFactory.getLog(
+ SchedulingAlgorithms.class.getName());
+
+ /**
+ * Compare Schedulables in order of priority and then submission time, as in
+ * the default FIFO scheduler in Hadoop.
+ */
+ public static class FifoComparator implements Comparator<Schedulable>, Serializable {
+ private static final long serialVersionUID = -5905036205491177060L;
+
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ int res = s1.getPriority().compareTo(s2.getPriority());
+ if (res == 0) {
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ }
+ if (res == 0) {
+ // In the rare case where jobs were submitted at the exact same time,
+ // compare them by name (which will be the JobID) to get a deterministic
+ // ordering, so we don't alternately launch tasks from different jobs.
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Compare Schedulables via weighted fair sharing. In addition, Schedulables
+ * below their min share get priority over those whose min share is met.
+ *
+ * Schedulables below their min share are compared by how far below it they
+ * are as a ratio. For example, if job A has 8 out of a min share of 10 tasks
+ * and job B has 50 out of a min share of 100, then job B is scheduled next,
+ * because B is at 50% of its min share and A is at 80% of its min share.
+ *
+ * Schedulables above their min share are compared by (runningTasks / weight).
+ * If all weights are equal, slots are given to the job with the fewest tasks;
+ * otherwise, jobs with more weight get proportionally more slots.
+ */
+ public static class FairShareComparator implements Comparator<Schedulable>, Serializable {
+ private static final long serialVersionUID = 5564969375856699313L;
+
+ @Override
+ public int compare(Schedulable s1, Schedulable s2) {
+ double minShareRatio1, minShareRatio2;
+ double useToWeightRatio1, useToWeightRatio2;
+ Resource minShare1 = Resources.min(s1.getMinShare(), s1.getDemand());
+ Resource minShare2 = Resources.min(s2.getMinShare(), s2.getDemand());
+ boolean s1Needy = Resources.lessThan(s1.getResourceUsage(), minShare1);
+ boolean s2Needy = Resources.lessThan(s2.getResourceUsage(), minShare2);
+ Resource one = Resources.createResource(1);
+ minShareRatio1 = (double) s1.getResourceUsage().getMemory() /
+ Resources.max(minShare1, one).getMemory();
+ minShareRatio2 = (double) s2.getResourceUsage().getMemory() /
+ Resources.max(minShare2, one).getMemory();
+ useToWeightRatio1 = s1.getResourceUsage().getMemory() / s1.getWeight();
+ useToWeightRatio2 = s2.getResourceUsage().getMemory() / s2.getWeight();
+ int res = 0;
+ if (s1Needy && !s2Needy)
+ res = -1;
+ else if (s2Needy && !s1Needy)
+ res = 1;
+ else if (s1Needy && s2Needy)
+ res = (int) Math.signum(minShareRatio1 - minShareRatio2);
+ else // Neither schedulable is needy
+ res = (int) Math.signum(useToWeightRatio1 - useToWeightRatio2);
+ if (res == 0) {
+ // Apps are tied in fairness ratio. Break the tie by submit time and job
+ // name to get a deterministic ordering, which is useful for unit tests.
+ res = (int) Math.signum(s1.getStartTime() - s2.getStartTime());
+ if (res == 0)
+ res = s1.getName().compareTo(s2.getName());
+ }
+ return res;
+ }
+ }
+
+ /**
+ * Number of iterations for the binary search in computeFairShares. This is
+ * equivalent to the number of bits of precision in the output. 25 iterations
+ * gives precision better than 0.1 slots in clusters with one million slots.
+ */
+ private static final int COMPUTE_FAIR_SHARES_ITERATIONS = 25;
+
+ /**
+ * Given a set of Schedulables and a number of slots, compute their weighted
+ * fair shares. The min shares and demands of the Schedulables are assumed to
+ * be set beforehand. We compute the fairest possible allocation of shares
+ * to the Schedulables that respects their min shares and demands.
+ *
+ * To understand what this method does, we must first define what weighted
+ * fair sharing means in the presence of minimum shares and demands. If there
+ * were no minimum shares and every Schedulable had an infinite demand (i.e.
+ * could launch infinitely many tasks), then weighted fair sharing would be
+ * achieved if the ratio of slotsAssigned / weight was equal for each
+ * Schedulable and all slots were assigned. Minimum shares and demands add
+ * two further twists:
+ * - Some Schedulables may not have enough tasks to fill all their share.
+ * - Some Schedulables may have a min share higher than their assigned share.
+ *
+ * To deal with these possibilities, we define an assignment of slots as
+ * being fair if there exists a ratio R such that:
+ * - Schedulables S where S.demand < R * S.weight are assigned share S.demand
+ * - Schedulables S where S.minShare > R * S.weight are given share S.minShare
+ * - All other Schedulables S are assigned share R * S.weight
+ * - The sum of all the shares is totalSlots.
+ *
+ * We call R the weight-to-slots ratio because it converts a Schedulable's
+ * weight to the number of slots it is assigned.
+ *
+ * We compute a fair allocation by finding a suitable weight-to-slot ratio R.
+ * To do this, we use binary search. Given a ratio R, we compute the number
+ * of slots that would be used in total with this ratio (the sum of the shares
+ * computed using the conditions above). If this number of slots is less than
+ * totalSlots, then R is too small and more slots could be assigned. If the
+ * number of slots is more than totalSlots, then R is too large.
+ *
+ * We begin the binary search with a lower bound on R of 0 (which means that
+ * all Schedulables are only given their minShare) and an upper bound computed
+ * to be large enough that too many slots are given (by doubling R until we
+ * either use more than totalSlots slots or we fulfill all jobs' demands).
+ * The helper method slotsUsedWithWeightToSlotRatio computes the total number
+ * of slots used with a given value of R.
+ *
+ * The running time of this algorithm is linear in the number of Schedulables,
+ * because slotsUsedWithWeightToSlotRatio is linear-time and the number of
+ * iterations of binary search is a constant (dependent on desired precision).
+ */
+ public static void computeFairShares(
+ Collection<? extends Schedulable> schedulables, Resource totalResources) {
+ // Find an upper bound on R that we can use in our binary search. We start
+ // at R = 1 and double it until we have either used totalSlots slots or we
+ // have met all Schedulables' demands (if total demand < totalSlots).
+ Resource totalDemand = Resources.createResource(0);
+ for (Schedulable sched: schedulables) {
+ Resources.addTo(totalDemand, sched.getDemand());
+ }
+ Resource cap = Resources.min(totalDemand, totalResources);
+ double rMax = 1.0;
+ while (Resources.lessThan(resUsedWithWeightToResRatio(rMax, schedulables), cap)) {
+ rMax *= 2.0;
+ }
+ // Perform the binary search for up to COMPUTE_FAIR_SHARES_ITERATIONS steps
+ double left = 0;
+ double right = rMax;
+ for (int i = 0; i < COMPUTE_FAIR_SHARES_ITERATIONS; i++) {
+ double mid = (left + right) / 2.0;
+ if (Resources.lessThan(resUsedWithWeightToResRatio(mid, schedulables), cap)) {
+ left = mid;
+ } else {
+ right = mid;
+ }
+ }
+ // Set the fair shares based on the value of R we've converged to
+ for (Schedulable sched: schedulables) {
+ sched.setFairShare(computeShare(sched, right));
+ }
+ }
+
+ /**
+ * Compute the number of slots that would be used given a weight-to-slot
+ * ratio w2sRatio, for use in the computeFairShares algorithm as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static Resource resUsedWithWeightToResRatio(double w2sRatio,
+ Collection<? extends Schedulable> schedulables) {
+ Resource slotsTaken = Resources.createResource(0);
+ for (Schedulable sched: schedulables) {
+ Resource share = computeShare(sched, w2sRatio);
+ Resources.addTo(slotsTaken, share);
+ }
+ return slotsTaken;
+ }
+
+ /**
+ * Compute the resources assigned to a Schedulable given a particular
+ * res-to-slot ratio r2sRatio, for use in computeFairShares as described
+ * in #{@link SchedulingAlgorithms#computeFairShares(Collection, double)}.
+ */
+ private static Resource computeShare(Schedulable sched, double r2sRatio) {
+ double share = sched.getWeight() * r2sRatio;
+ share = Math.max(share, sched.getMinShare().getMemory());
+ share = Math.min(share, sched.getDemand().getMemory());
+ return Resources.createResource((int) share);
+ }
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingMode.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+
+/**
+ * Internal scheduling modes for queues.
+ */
+@Private
+@Unstable
+public enum SchedulingMode {
+ FAIR, FIFO
+}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/WeightAdjuster.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configurable;
+
+/**
+ * A pluggable object for altering the weights of apps in the fair scheduler,
+ * which is used for example by {@link NewJobWeightBooster} to give higher
+ * weight to new jobs so that short jobs finish faster.
+ *
+ * May implement {@link Configurable} to access configuration parameters.
+ */
+@Private
+@Unstable
+public interface WeightAdjuster {
+ public double adjustWeight(AppSchedulable app, double curWeight);
+}
Modified: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java?rev=1361020&r1=1361019&r2=1361020&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java (original)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RmController.java Fri Jul 13 00:43:01 2012
@@ -18,14 +18,17 @@
package org.apache.hadoop.yarn.server.resourcemanager.webapp;
-import static org.apache.hadoop.yarn.server.resourcemanager.webapp.RMWebApp.QUEUE_NAME;
import static org.apache.hadoop.yarn.util.StringHelper.join;
+import static org.apache.hadoop.yarn.webapp.YarnWebParams.QUEUE_NAME;
+
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.util.StringHelper;
import org.apache.hadoop.yarn.webapp.Controller;
+import org.apache.hadoop.yarn.webapp.WebAppException;
import org.apache.hadoop.yarn.webapp.YarnWebParams;
import com.google.inject.Inject;
@@ -71,6 +74,12 @@ public class RmController extends Contro
render(CapacitySchedulerPage.class);
return;
}
+
+ if (rs instanceof FairScheduler) {
+ context().setStatus(404);
+ throw new WebAppException("Fair Scheduler UI not yet supported");
+ }
+
setTitle("Default Scheduler");
render(DefaultSchedulerPage.class);
}
Added: hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java?rev=1361020&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java (added)
+++ hadoop/common/trunk/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSSchedulerApp.java Fri Jul 13 00:43:01 2012
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import static org.junit.Assert.assertEquals;
+
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.factories.RecordFactory;
+import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSSchedulerApp;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+public class TestFSSchedulerApp {
+ private static RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null);
+
+ private ApplicationAttemptId createAppAttemptId(int appId, int attemptId) {
+ ApplicationAttemptId attId = recordFactory.newRecordInstance(ApplicationAttemptId.class);
+ ApplicationId appIdImpl = recordFactory.newRecordInstance(ApplicationId.class);
+ appIdImpl.setId(appId);
+ attId.setAttemptId(attemptId);
+ attId.setApplicationId(appIdImpl);
+ return attId;
+ }
+
+ @Test
+ public void testDelayScheduling() {
+ Queue queue = Mockito.mock(Queue.class);
+ Priority prio = Mockito.mock(Priority.class);
+ Mockito.when(prio.getPriority()).thenReturn(1);
+ double nodeLocalityThreshold = .5;
+ double rackLocalityThreshold = .6;
+
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
+
+ // Default level should be node-local
+ assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+ // First five scheduling opportunities should remain node local
+ for (int i = 0; i < 5; i++) {
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+ }
+
+ // After five it should switch to rack local
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+ // Manually set back to node local
+ schedulerApp.resetAllowedLocalityLevel(prio, NodeType.NODE_LOCAL);
+ schedulerApp.resetSchedulingOpportunities(prio);
+ assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+ // Now escalate again to rack-local, then to off-switch
+ for (int i = 0; i < 5; i++) {
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.NODE_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+ }
+
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+
+ for (int i = 0; i < 6; i++) {
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.RACK_LOCAL, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+ }
+
+ schedulerApp.addSchedulingOpportunity(prio);
+ assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, nodeLocalityThreshold, rackLocalityThreshold));
+ }
+
+ @Test
+ /**
+ * Ensure that when negative paramaters are given (signaling delay scheduling
+ * no tin use), the least restrictive locality level is returned.
+ */
+ public void testLocalityLevelWithoutDelays() {
+ Queue queue = Mockito.mock(Queue.class);
+ Priority prio = Mockito.mock(Priority.class);
+ Mockito.when(prio.getPriority()).thenReturn(1);
+
+ ApplicationAttemptId applicationAttemptId = createAppAttemptId(1, 1);
+ FSSchedulerApp schedulerApp =
+ new FSSchedulerApp(applicationAttemptId, "user1", queue , null, null, null);
+ assertEquals(NodeType.OFF_SWITCH, schedulerApp.getAllowedLocalityLevel(
+ prio, 10, -1.0, -1.0));
+ }
+}