You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2013/12/05 04:26:12 UTC
svn commit: r1548006 [1/2] - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-server/ha...
Author: sandy
Date: Thu Dec 5 03:26:11 2013
New Revision: 1548006
URL: http://svn.apache.org/r1548006
Log:
YARN-1403. Separate out configuration loading from QueueManager in the Fair Scheduler (Sandy Ryza)
Added:
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SchedulingPolicy.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/FairSchedulerQueueInfo.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFSLeafQueue.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairSchedulerConfiguration.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestMaxRunningAppsEnforcer.java
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Dec 5 03:26:11 2013
@@ -135,6 +135,9 @@ Release 2.4.0 - UNRELEASED
YARN-1332. In TestAMRMClient, replace assertTrue with assertEquals where
possible (Sebastian Wong via Sandy Ryza)
+ YARN-1403. Separate out configuration loading from QueueManager in the Fair
+ Scheduler (Sandy Ryza)
+
OPTIMIZATIONS
BUG FIXES
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1548006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Thu Dec 5 03:26:11 2013
@@ -0,0 +1,229 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+import com.google.common.annotations.VisibleForTesting;
+
+public class AllocationConfiguration {
+ private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
+ private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
+
+ // Minimum resource allocation for each queue
+ private final Map<String, Resource> minQueueResources;
+ // Maximum amount of resources per queue
+ private final Map<String, Resource> maxQueueResources;
+ // Sharing weights for each queue
+ private final Map<String, ResourceWeights> queueWeights;
+
+ // Max concurrent running applications for each queue and for each user; in addition,
+ // for users that have no max specified, we use the userMaxJobsDefault.
+ @VisibleForTesting
+ final Map<String, Integer> queueMaxApps;
+ @VisibleForTesting
+ final Map<String, Integer> userMaxApps;
+ private final int userMaxAppsDefault;
+ private final int queueMaxAppsDefault;
+
+ // ACL's for each queue. Only specifies non-default ACL's from configuration.
+ private final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
+
+ // Min share preemption timeout for each queue in seconds. If a job in the queue
+ // waits this long without receiving its guaranteed share, it is allowed to
+ // preempt other jobs' tasks.
+ private final Map<String, Long> minSharePreemptionTimeouts;
+
+ // Default min share preemption timeout for queues where it is not set
+ // explicitly.
+ private final long defaultMinSharePreemptionTimeout;
+
+ // Preemption timeout for jobs below fair share in seconds. If a job remains
+ // below half its fair share for this long, it is allowed to preempt tasks.
+ private final long fairSharePreemptionTimeout;
+
+ private final Map<String, SchedulingPolicy> schedulingPolicies;
+
+ private final SchedulingPolicy defaultSchedulingPolicy;
+
+ // Policy for mapping apps to queues
+ @VisibleForTesting
+ QueuePlacementPolicy placementPolicy;
+
+ private final Set<String> queueNames;
+
+ public AllocationConfiguration(Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
+ Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
+ Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
+ int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies,
+ SchedulingPolicy defaultSchedulingPolicy,
+ Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
+ QueuePlacementPolicy placementPolicy, Set<String> queueNames) {
+ this.minQueueResources = minQueueResources;
+ this.maxQueueResources = maxQueueResources;
+ this.queueMaxApps = queueMaxApps;
+ this.userMaxApps = userMaxApps;
+ this.queueWeights = queueWeights;
+ this.userMaxAppsDefault = userMaxAppsDefault;
+ this.queueMaxAppsDefault = queueMaxAppsDefault;
+ this.defaultSchedulingPolicy = defaultSchedulingPolicy;
+ this.schedulingPolicies = schedulingPolicies;
+ this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
+ this.queueAcls = queueAcls;
+ this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
+ this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ this.placementPolicy = placementPolicy;
+ this.queueNames = queueNames;
+ }
+
+ public AllocationConfiguration(Configuration conf) {
+ minQueueResources = new HashMap<String, Resource>();
+ maxQueueResources = new HashMap<String, Resource>();
+ queueWeights = new HashMap<String, ResourceWeights>();
+ queueMaxApps = new HashMap<String, Integer>();
+ userMaxApps = new HashMap<String, Integer>();
+ userMaxAppsDefault = Integer.MAX_VALUE;
+ queueMaxAppsDefault = Integer.MAX_VALUE;
+ queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
+ minSharePreemptionTimeouts = new HashMap<String, Long>();
+ defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ fairSharePreemptionTimeout = Long.MAX_VALUE;
+ schedulingPolicies = new HashMap<String, SchedulingPolicy>();
+ defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
+ placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
+ new HashSet<String>());
+ queueNames = new HashSet<String>();
+ }
+
+ /**
+ * Get the ACLs associated with this queue. If a given ACL is not explicitly
+ * configured, include the default value for that ACL. The default for the
+ * root queue is everybody ("*") and the default for all other queues is
+ * nobody ("")
+ */
+ public AccessControlList getQueueAcl(String queue, QueueACL operation) {
+ Map<QueueACL, AccessControlList> queueAcls = this.queueAcls.get(queue);
+ if (queueAcls != null) {
+ AccessControlList operationAcl = queueAcls.get(operation);
+ if (operationAcl != null) {
+ return operationAcl;
+ }
+ }
+ return (queue.equals("root")) ? EVERYBODY_ACL : NOBODY_ACL;
+ }
+
+ /**
+ * Get a queue's min share preemption timeout, in milliseconds. This is the
+ * time after which jobs in the queue may kill other queues' tasks if they
+ * are below their min share.
+ */
+ public long getMinSharePreemptionTimeout(String queueName) {
+ Long minSharePreemptionTimeout = minSharePreemptionTimeouts.get(queueName);
+ return (minSharePreemptionTimeout == null) ? defaultMinSharePreemptionTimeout
+ : minSharePreemptionTimeout;
+ }
+
+ /**
+ * Get the fair share preemption, in milliseconds. This is the time
+ * after which any job may kill other jobs' tasks if it is below half
+ * its fair share.
+ */
+ public long getFairSharePreemptionTimeout() {
+ return fairSharePreemptionTimeout;
+ }
+
+ public ResourceWeights getQueueWeight(String queue) {
+ ResourceWeights weight = queueWeights.get(queue);
+ return (weight == null) ? ResourceWeights.NEUTRAL : weight;
+ }
+
+ public int getUserMaxApps(String user) {
+ Integer maxApps = userMaxApps.get(user);
+ return (maxApps == null) ? userMaxAppsDefault : maxApps;
+ }
+
+ public int getQueueMaxApps(String queue) {
+ Integer maxApps = queueMaxApps.get(queue);
+ return (maxApps == null) ? queueMaxAppsDefault : maxApps;
+ }
+
+ /**
+ * Get the minimum resource allocation for the given queue.
+ * @return the cap set on this queue, or 0 if not set.
+ */
+ public Resource getMinResources(String queue) {
+ Resource minQueueResource = minQueueResources.get(queue);
+ return (minQueueResource == null) ? Resources.none() : minQueueResource;
+ }
+
+ /**
+ * Get the maximum resource allocation for the given queue.
+ * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
+ */
+
+ public Resource getMaxResources(String queueName) {
+ Resource maxQueueResource = maxQueueResources.get(queueName);
+ return (maxQueueResource == null) ? Resources.unbounded() : maxQueueResource;
+ }
+
+ public boolean hasAccess(String queueName, QueueACL acl,
+ UserGroupInformation user) {
+ int lastPeriodIndex = queueName.length();
+ while (lastPeriodIndex != -1) {
+ String queue = queueName.substring(0, lastPeriodIndex);
+ if (getQueueAcl(queue, acl).isUserAllowed(user)) {
+ return true;
+ }
+
+ lastPeriodIndex = queueName.lastIndexOf('.', lastPeriodIndex - 1);
+ }
+
+ return false;
+ }
+
+ public SchedulingPolicy getSchedulingPolicy(String queueName) {
+ SchedulingPolicy policy = schedulingPolicies.get(queueName);
+ return (policy == null) ? defaultSchedulingPolicy : policy;
+ }
+
+ public SchedulingPolicy getDefaultSchedulingPolicy() {
+ return defaultSchedulingPolicy;
+ }
+
+ public Set<String> getQueueNames() {
+ return queueNames;
+ }
+
+ public QueuePlacementPolicy getPlacementPolicy() {
+ return placementPolicy;
+ }
+}
\ No newline at end of file
Added: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1548006&view=auto
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (added)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Thu Dec 5 03:26:11 2013
@@ -0,0 +1,398 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.URL;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import javax.xml.parsers.DocumentBuilder;
+import javax.xml.parsers.DocumentBuilderFactory;
+import javax.xml.parsers.ParserConfigurationException;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.authorize.AccessControlList;
+import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.records.QueueACL;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
+import org.apache.hadoop.yarn.util.Clock;
+import org.apache.hadoop.yarn.util.SystemClock;
+import org.apache.hadoop.yarn.util.resource.Resources;
+import org.w3c.dom.Document;
+import org.w3c.dom.Element;
+import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+import org.w3c.dom.Text;
+import org.xml.sax.SAXException;
+
+import com.google.common.annotations.VisibleForTesting;
+
+@Public
+@Unstable
+public class AllocationFileLoaderService extends AbstractService {
+
+ public static final Log LOG = LogFactory.getLog(
+ AllocationFileLoaderService.class.getName());
+
+ /** Time to wait between checks of the allocation file */
+ public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000;
+
+ /**
+ * Time to wait after the allocation has been modified before reloading it
+ * (this is done to prevent loading a file that hasn't been fully written).
+ */
+ public static final long ALLOC_RELOAD_WAIT_MS = 5 * 1000;
+
+ private final Clock clock;
+
+ private long lastSuccessfulReload; // Last time we successfully reloaded queues
+ private boolean lastReloadAttemptFailed = false;
+
+ // Path to XML file containing allocations.
+ private File allocFile;
+
+ private Listener reloadListener;
+
+ @VisibleForTesting
+ long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS;
+
+ private Thread reloadThread;
+ private volatile boolean running = true;
+
+ public AllocationFileLoaderService() {
+ this(new SystemClock());
+ }
+
+ public AllocationFileLoaderService(Clock clock) {
+ super(AllocationFileLoaderService.class.getName());
+ this.clock = clock;
+
+ }
+
+ @Override
+ public void init(Configuration conf) {
+ this.allocFile = getAllocationFile(conf);
+ super.init(conf);
+ }
+
+ @Override
+ public void start() {
+ if (allocFile == null) {
+ return;
+ }
+ reloadThread = new Thread() {
+ public void run() {
+ while (running) {
+ long time = clock.getTime();
+ long lastModified = allocFile.lastModified();
+ if (lastModified > lastSuccessfulReload &&
+ time > lastModified + ALLOC_RELOAD_WAIT_MS) {
+ try {
+ reloadAllocations();
+ } catch (Exception ex) {
+ if (!lastReloadAttemptFailed) {
+ LOG.error("Failed to reload fair scheduler config file - " +
+ "will use existing allocations.", ex);
+ }
+ lastReloadAttemptFailed = true;
+ }
+ } else if (lastModified == 0l) {
+ if (!lastReloadAttemptFailed) {
+ LOG.warn("Failed to reload fair scheduler config file because" +
+ " last modified returned 0. File exists: " + allocFile.exists());
+ }
+ lastReloadAttemptFailed = true;
+ }
+ try {
+ Thread.sleep(reloadIntervalMs);
+ } catch (InterruptedException ex) {
+ LOG.info("Interrupted while waiting to reload alloc configuration");
+ }
+ }
+ }
+ };
+ reloadThread.setName("AllocationFileReloader");
+ reloadThread.setDaemon(true);
+ reloadThread.start();
+ super.start();
+ }
+
+ @Override
+ public void stop() {
+ running = false;
+ reloadThread.interrupt();
+ super.stop();
+ }
+
+ /**
+ * Path to XML file containing allocations. If the
+ * path is relative, it is searched for in the
+ * classpath, but loaded like a regular File.
+ */
+ public File getAllocationFile(Configuration conf) {
+ String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
+ FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);
+ File allocFile = new File(allocFilePath);
+ if (!allocFile.isAbsolute()) {
+ URL url = Thread.currentThread().getContextClassLoader()
+ .getResource(allocFilePath);
+ if (url == null) {
+ LOG.warn(allocFilePath + " not found on the classpath.");
+ allocFile = null;
+ } else if (!url.getProtocol().equalsIgnoreCase("file")) {
+ throw new RuntimeException("Allocation file " + url
+ + " found on the classpath is not on the local filesystem.");
+ } else {
+ allocFile = new File(url.getPath());
+ }
+ }
+ return allocFile;
+ }
+
+ public synchronized void setReloadListener(Listener reloadListener) {
+ this.reloadListener = reloadListener;
+ }
+
+ /**
+ * Updates the allocation list from the allocation config file. This file is
+ * expected to be in the XML format specified in the design doc.
+ *
+ * @throws IOException if the config file cannot be read.
+ * @throws AllocationConfigurationException if allocations are invalid.
+ * @throws ParserConfigurationException if XML parser is misconfigured.
+ * @throws SAXException if config file is malformed.
+ */
+ public synchronized void reloadAllocations() throws IOException,
+ ParserConfigurationException, SAXException, AllocationConfigurationException {
+ if (allocFile == null) {
+ return;
+ }
+ LOG.info("Loading allocation file " + allocFile);
+ // Create some temporary hashmaps to hold the new allocs, and we only save
+ // them in our fields if we have parsed the entire allocs file successfully.
+ Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
+ Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
+ Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
+ Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
+ Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
+ Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
+ Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls =
+ new HashMap<String, Map<QueueACL, AccessControlList>>();
+ int userMaxAppsDefault = Integer.MAX_VALUE;
+ int queueMaxAppsDefault = Integer.MAX_VALUE;
+ long fairSharePreemptionTimeout = Long.MAX_VALUE;
+ long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
+ SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY;
+
+ QueuePlacementPolicy newPlacementPolicy = null;
+
+ // Remember all queue names so we can display them on web UI, etc.
+ Set<String> queueNamesInAllocFile = new HashSet<String>();
+
+ // Read and parse the allocations file.
+ DocumentBuilderFactory docBuilderFactory =
+ DocumentBuilderFactory.newInstance();
+ docBuilderFactory.setIgnoringComments(true);
+ DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
+ Document doc = builder.parse(allocFile);
+ Element root = doc.getDocumentElement();
+ if (!"allocations".equals(root.getTagName()))
+ throw new AllocationConfigurationException("Bad fair scheduler config " +
+ "file: top-level element not <allocations>");
+ NodeList elements = root.getChildNodes();
+ List<Element> queueElements = new ArrayList<Element>();
+ Element placementPolicyElement = null;
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element)node;
+ if ("queue".equals(element.getTagName()) ||
+ "pool".equals(element.getTagName())) {
+ queueElements.add(element);
+ } else if ("user".equals(element.getTagName())) {
+ String userName = element.getAttribute("name");
+ NodeList fields = element.getChildNodes();
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxApps.put(userName, val);
+ }
+ }
+ } else if ("userMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ userMaxAppsDefault = val;
+ } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ fairSharePreemptionTimeout = val;
+ } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ defaultMinSharePreemptionTimeout = val;
+ } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxAppsDefault = val;
+ } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
+ || "defaultQueueSchedulingMode".equals(element.getTagName())) {
+ String text = ((Text)element.getFirstChild()).getData().trim();
+ defaultSchedPolicy = SchedulingPolicy.parse(text);
+ } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+ placementPolicyElement = element;
+ } else {
+ LOG.warn("Bad element in allocations file: " + element.getTagName());
+ }
+ }
+ }
+
+ // Load queue elements. A root queue can either be included or omitted. If
+ // it's included, all other queues must be inside it.
+ for (Element element : queueElements) {
+ String parent = "root";
+ if (element.getAttribute("name").equalsIgnoreCase("root")) {
+ if (queueElements.size() > 1) {
+ throw new AllocationConfigurationException("If configuring root queue,"
+ + " no other queues can be placed alongside it.");
+ }
+ parent = null;
+ }
+ loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
+ userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
+ }
+
+ // Load placement policy and pass it configured queues
+ Configuration conf = getConfig();
+ if (placementPolicyElement != null) {
+ newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+ queueNamesInAllocFile, conf);
+ } else {
+ newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
+ queueNamesInAllocFile);
+ }
+
+ AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
+ queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
+ queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
+ newPlacementPolicy, queueNamesInAllocFile);
+
+ lastSuccessfulReload = clock.getTime();
+ lastReloadAttemptFailed = false;
+
+ reloadListener.onReload(info);
+ }
+
+ /**
+ * Loads a queue from a queue element in the configuration file
+ */
+ private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
+ Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
+ Map<String, SchedulingPolicy> queuePolicies,
+ Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile)
+ throws AllocationConfigurationException {
+ String queueName = element.getAttribute("name");
+ if (parentName != null) {
+ queueName = parentName + "." + queueName;
+ }
+ Map<QueueACL, AccessControlList> acls =
+ new HashMap<QueueACL, AccessControlList>();
+ NodeList fields = element.getChildNodes();
+ boolean isLeaf = true;
+
+ for (int j = 0; j < fields.getLength(); j++) {
+ Node fieldNode = fields.item(j);
+ if (!(fieldNode instanceof Element))
+ continue;
+ Element field = (Element) fieldNode;
+ if ("minResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ minQueueResources.put(queueName, val);
+ } else if ("maxResources".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
+ maxQueueResources.put(queueName, val);
+ } else if ("maxRunningApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ int val = Integer.parseInt(text);
+ queueMaxApps.put(queueName, val);
+ } else if ("weight".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ double val = Double.parseDouble(text);
+ queueWeights.put(queueName, new ResourceWeights((float)val));
+ } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ long val = Long.parseLong(text) * 1000L;
+ minSharePreemptionTimeouts.put(queueName, val);
+ } else if ("schedulingPolicy".equals(field.getTagName())
+ || "schedulingMode".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData().trim();
+ SchedulingPolicy policy = SchedulingPolicy.parse(text);
+ queuePolicies.put(queueName, policy);
+ } else if ("aclSubmitApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData();
+ acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
+ } else if ("aclAdministerApps".equals(field.getTagName())) {
+ String text = ((Text)field.getFirstChild()).getData();
+ acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
+ } else if ("queue".endsWith(field.getTagName()) ||
+ "pool".equals(field.getTagName())) {
+ loadQueue(queueName, field, minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, queuePolicies,
+ minSharePreemptionTimeouts,
+ queueAcls, queueNamesInAllocFile);
+ isLeaf = false;
+ }
+ }
+ if (isLeaf) {
+ queueNamesInAllocFile.add(queueName);
+ }
+ queueAcls.put(queueName, acls);
+ if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
+ && !Resources.fitsIn(minQueueResources.get(queueName),
+ maxQueueResources.get(queueName))) {
+ LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
+ queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
+ }
+ }
+
+ public interface Listener {
+ public void onReload(AllocationConfiguration info);
+ }
+}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java Thu Dec 5 03:26:11 2013
@@ -46,19 +46,15 @@ public class FSLeafQueue extends FSQueue
private final List<AppSchedulable> nonRunnableAppScheds =
new ArrayList<AppSchedulable>();
- private final FairScheduler scheduler;
- private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
// Variables used for preemption
private long lastTimeAtMinShare;
private long lastTimeAtHalfFairShare;
- public FSLeafQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+ public FSLeafQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
- super(name, queueMgr, scheduler, parent);
- this.scheduler = scheduler;
- this.queueMgr = queueMgr;
+ super(name, scheduler, parent);
this.lastTimeAtMinShare = scheduler.getClock().getTime();
this.lastTimeAtHalfFairShare = scheduler.getClock().getTime();
}
@@ -145,7 +141,8 @@ public class FSLeafQueue extends FSQueue
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
- Resource maxRes = queueMgr.getMaxResources(getName());
+ Resource maxRes = scheduler.getAllocationConfiguration()
+ .getMaxResources(getName());
demand = Resources.createResource(0);
for (AppSchedulable sched : runnableAppScheds) {
if (Resources.equals(demand, maxRes)) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSParentQueue.java Thu Dec 5 03:26:11 2013
@@ -41,14 +41,12 @@ public class FSParentQueue extends FSQue
private final List<FSQueue> childQueues =
new ArrayList<FSQueue>();
- private final QueueManager queueMgr;
private Resource demand = Resources.createResource(0);
private int runnableApps;
- public FSParentQueue(String name, QueueManager queueMgr, FairScheduler scheduler,
+ public FSParentQueue(String name, FairScheduler scheduler,
FSParentQueue parent) {
- super(name, queueMgr, scheduler, parent);
- this.queueMgr = queueMgr;
+ super(name, scheduler, parent);
}
public void addChildQueue(FSQueue child) {
@@ -82,7 +80,8 @@ public class FSParentQueue extends FSQue
public void updateDemand() {
// Compute demand by iterating through apps in the queue
// Limit demand to maxResources
- Resource maxRes = queueMgr.getMaxResources(getName());
+ Resource maxRes = scheduler.getAllocationConfiguration()
+ .getMaxResources(getName());
demand = Resources.createResource(0);
for (FSQueue childQueue : childQueues) {
childQueue.updateDemand();
@@ -164,8 +163,8 @@ public class FSParentQueue extends FSQue
public void setPolicy(SchedulingPolicy policy)
throws AllocationConfigurationException {
boolean allowed =
- SchedulingPolicy.isApplicableTo(policy, (this == queueMgr
- .getRootQueue()) ? SchedulingPolicy.DEPTH_ROOT
+ SchedulingPolicy.isApplicableTo(policy, (parent == null)
+ ? SchedulingPolicy.DEPTH_ROOT
: SchedulingPolicy.DEPTH_INTERMEDIATE);
if (!allowed) {
throwPolicyDoesnotApplyException(policy);
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueue.java Thu Dec 5 03:26:11 2013
@@ -39,20 +39,17 @@ import org.apache.hadoop.yarn.util.resou
@Unstable
public abstract class FSQueue extends Schedulable implements Queue {
private final String name;
- private final QueueManager queueMgr;
- private final FairScheduler scheduler;
+ protected final FairScheduler scheduler;
private final FSQueueMetrics metrics;
protected final FSParentQueue parent;
protected final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
- protected SchedulingPolicy policy = SchedulingPolicy.getDefault();
+ protected SchedulingPolicy policy = SchedulingPolicy.DEFAULT_POLICY;
- public FSQueue(String name, QueueManager queueMgr,
- FairScheduler scheduler, FSParentQueue parent) {
+ public FSQueue(String name, FairScheduler scheduler, FSParentQueue parent) {
this.name = name;
- this.queueMgr = queueMgr;
this.scheduler = scheduler;
this.metrics = FSQueueMetrics.forQueue(getName(), parent, true, scheduler.getConf());
metrics.setMinShare(getMinShare());
@@ -88,17 +85,17 @@ public abstract class FSQueue extends Sc
@Override
public ResourceWeights getWeights() {
- return queueMgr.getQueueWeight(getName());
+ return scheduler.getAllocationConfiguration().getQueueWeight(getName());
}
@Override
public Resource getMinShare() {
- return queueMgr.getMinResources(getName());
+ return scheduler.getAllocationConfiguration().getMinResources(getName());
}
@Override
public Resource getMaxShare() {
- return queueMgr.getMaxResources(getName());
+ return scheduler.getAllocationConfiguration().getMaxResources(getName());
}
@Override
@@ -148,13 +145,7 @@ public abstract class FSQueue extends Sc
}
public boolean hasAccess(QueueACL acl, UserGroupInformation user) {
- // Check if the leaf-queue allows access
- if (queueMgr.getQueueAcl(getName(), acl).isUserAllowed(user)) {
- return true;
- }
-
- // Check if parent-queue allows access
- return parent != null && parent.hasAccess(acl, user);
+ return scheduler.getAllocationConfiguration().hasAccess(name, acl, user);
}
/**
@@ -181,7 +172,7 @@ public abstract class FSQueue extends Sc
*/
protected boolean assignContainerPreCheck(FSSchedulerNode node) {
if (!Resources.fitsIn(getResourceUsage(),
- queueMgr.getMaxResources(getName()))
+ scheduler.getAllocationConfiguration().getMaxResources(getName()))
|| node.getReservedContainer() != null) {
return false;
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Dec 5 03:26:11 2013
@@ -192,11 +192,16 @@ public class FairScheduler implements Re
@VisibleForTesting
final MaxRunningAppsEnforcer maxRunningEnforcer;
+
+ private AllocationFileLoaderService allocsLoader;
+ @VisibleForTesting
+ AllocationConfiguration allocConf;
public FairScheduler() {
clock = new SystemClock();
+ allocsLoader = new AllocationFileLoaderService();
queueMgr = new QueueManager(this);
- maxRunningEnforcer = new MaxRunningAppsEnforcer(queueMgr);
+ maxRunningEnforcer = new MaxRunningAppsEnforcer(this);
}
private void validateConf(Configuration conf) {
@@ -275,7 +280,6 @@ public class FairScheduler implements Re
* required resources per job.
*/
protected synchronized void update() {
- queueMgr.reloadAllocsIfNecessary(); // Relaod alloc file
updatePreemptionVariables(); // Determine if any queues merit preemption
FSQueue rootQueue = queueMgr.getRootQueue();
@@ -480,8 +484,8 @@ public class FairScheduler implements Re
*/
protected Resource resToPreempt(FSLeafQueue sched, long curTime) {
String queue = sched.getName();
- long minShareTimeout = queueMgr.getMinSharePreemptionTimeout(queue);
- long fairShareTimeout = queueMgr.getFairSharePreemptionTimeout();
+ long minShareTimeout = allocConf.getMinSharePreemptionTimeout(queue);
+ long fairShareTimeout = allocConf.getFairSharePreemptionTimeout();
Resource resDueToMinShare = Resources.none();
Resource resDueToFairShare = Resources.none();
if (curTime - sched.getLastTimeAtMinShare() > minShareTimeout) {
@@ -650,8 +654,8 @@ public class FairScheduler implements Re
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
FSLeafQueue queue = null;
try {
- QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
- queueName = policy.assignAppToQueue(queueName, user);
+ QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy();
+ queueName = placementPolicy.assignAppToQueue(queueName, user);
if (queueName == null) {
return null;
}
@@ -1128,27 +1132,27 @@ public class FairScheduler implements Re
@Override
public synchronized void reinitialize(Configuration conf, RMContext rmContext)
throws IOException {
- this.conf = new FairSchedulerConfiguration(conf);
- validateConf(this.conf);
- minimumAllocation = this.conf.getMinimumAllocation();
- maximumAllocation = this.conf.getMaximumAllocation();
- incrAllocation = this.conf.getIncrementAllocation();
- continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
- continuousSchedulingSleepMs =
- this.conf.getContinuousSchedulingSleepMs();
- nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
- rackLocalityThreshold = this.conf.getLocalityThresholdRack();
- nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
- rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
- preemptionEnabled = this.conf.getPreemptionEnabled();
- assignMultiple = this.conf.getAssignMultiple();
- maxAssign = this.conf.getMaxAssign();
- sizeBasedWeight = this.conf.getSizeBasedWeight();
- preemptionInterval = this.conf.getPreemptionInterval();
- waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
- usePortForNodeName = this.conf.getUsePortForNodeName();
-
if (!initialized) {
+ this.conf = new FairSchedulerConfiguration(conf);
+ validateConf(this.conf);
+ minimumAllocation = this.conf.getMinimumAllocation();
+ maximumAllocation = this.conf.getMaximumAllocation();
+ incrAllocation = this.conf.getIncrementAllocation();
+ continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
+ continuousSchedulingSleepMs =
+ this.conf.getContinuousSchedulingSleepMs();
+ nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
+ rackLocalityThreshold = this.conf.getLocalityThresholdRack();
+ nodeLocalityDelayMs = this.conf.getLocalityDelayNodeMs();
+ rackLocalityDelayMs = this.conf.getLocalityDelayRackMs();
+ preemptionEnabled = this.conf.getPreemptionEnabled();
+ assignMultiple = this.conf.getAssignMultiple();
+ maxAssign = this.conf.getMaxAssign();
+ sizeBasedWeight = this.conf.getSizeBasedWeight();
+ preemptionInterval = this.conf.getPreemptionInterval();
+ waitTimeBeforeKill = this.conf.getWaitTimeBeforeKill();
+ usePortForNodeName = this.conf.getUsePortForNodeName();
+
rootMetrics = FSQueueMetrics.forQueue("root", null, true, conf);
this.rmContext = rmContext;
this.eventLog = new FairSchedulerEventLog();
@@ -1156,8 +1160,9 @@ public class FairScheduler implements Re
initialized = true;
+ allocConf = new AllocationConfiguration(conf);
try {
- queueMgr.initialize();
+ queueMgr.initialize(conf);
} catch (Exception e) {
throw new IOException("Failed to start FairScheduler", e);
}
@@ -1181,12 +1186,24 @@ public class FairScheduler implements Re
schedulingThread.setDaemon(true);
schedulingThread.start();
}
- } else {
+
+ allocsLoader.init(conf);
+ allocsLoader.setReloadListener(new AllocationReloadListener());
+ // If we fail to load allocations file on initialize, we want to fail
+ // immediately. After a successful load, exceptions on future reloads
+ // will just result in leaving things as they are.
try {
- queueMgr.reloadAllocs();
+ allocsLoader.reloadAllocations();
} catch (Exception e) {
throw new IOException("Failed to initialize FairScheduler", e);
}
+ allocsLoader.start();
+ } else {
+ try {
+ allocsLoader.reloadAllocations();
+ } catch (Exception e) {
+ LOG.error("Failed to reload allocations file", e);
+ }
}
}
@@ -1230,5 +1247,24 @@ public class FairScheduler implements Re
}
return queue.hasAccess(acl, callerUGI);
}
+
+ public AllocationConfiguration getAllocationConfiguration() {
+ return allocConf;
+ }
+
+ private class AllocationReloadListener implements
+ AllocationFileLoaderService.Listener {
+
+ @Override
+ public void onReload(AllocationConfiguration queueInfo) {
+ // Commit the reload; also create any queue defined in the alloc file
+ // if it does not already exist, so it can be displayed on the web UI.
+ synchronized (FairScheduler.this) {
+ allocConf = queueInfo;
+ allocConf.getDefaultSchedulingPolicy().initialize(clusterCapacity);
+ queueMgr.updateAllocationConfiguration(allocConf);
+ }
+ }
+ }
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairSchedulerConfiguration.java Thu Dec 5 03:26:11 2013
@@ -18,7 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.File;
-import java.net.URL;
+import java.util.ArrayList;
+import java.util.List;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -151,14 +152,6 @@ public class FairSchedulerConfiguration
return Resources.createResource(incrementMemory, incrementCores);
}
- public boolean getAllowUndeclaredPools() {
- return getBoolean(ALLOW_UNDECLARED_POOLS, DEFAULT_ALLOW_UNDECLARED_POOLS);
- }
-
- public boolean getUserAsDefaultQueue() {
- return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
- }
-
public float getLocalityThresholdNode() {
return getFloat(LOCALITY_THRESHOLD_NODE, DEFAULT_LOCALITY_THRESHOLD_NODE);
}
@@ -199,30 +192,6 @@ public class FairSchedulerConfiguration
return getBoolean(SIZE_BASED_WEIGHT, DEFAULT_SIZE_BASED_WEIGHT);
}
- /**
- * Path to XML file containing allocations. If the
- * path is relative, it is searched for in the
- * classpath, but loaded like a regular File.
- */
- public File getAllocationFile() {
- String allocFilePath = get(ALLOCATION_FILE, DEFAULT_ALLOCATION_FILE);
- File allocFile = new File(allocFilePath);
- if (!allocFile.isAbsolute()) {
- URL url = Thread.currentThread().getContextClassLoader()
- .getResource(allocFilePath);
- if (url == null) {
- LOG.warn(allocFilePath + " not found on the classpath.");
- allocFile = null;
- } else if (!url.getProtocol().equalsIgnoreCase("file")) {
- throw new RuntimeException("Allocation file " + url
- + " found on the classpath is not on the local filesystem.");
- } else {
- allocFile = new File(url.getPath());
- }
- }
- return allocFile;
- }
-
public String getEventlogDir() {
return get(EVENT_LOG_DIR, new File(System.getProperty("hadoop.log.dir",
"/tmp/")).getAbsolutePath() + File.separator + "fairscheduler");
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java Thu Dec 5 03:26:11 2013
@@ -33,15 +33,15 @@ import com.google.common.collect.ListMul
* constraints
*/
public class MaxRunningAppsEnforcer {
- private final QueueManager queueMgr;
+ private final FairScheduler scheduler;
// Tracks the number of running applications by user.
private final Map<String, Integer> usersNumRunnableApps;
@VisibleForTesting
final ListMultimap<String, AppSchedulable> usersNonRunnableApps;
- public MaxRunningAppsEnforcer(QueueManager queueMgr) {
- this.queueMgr = queueMgr;
+ public MaxRunningAppsEnforcer(FairScheduler scheduler) {
+ this.scheduler = scheduler;
this.usersNumRunnableApps = new HashMap<String, Integer>();
this.usersNonRunnableApps = ArrayListMultimap.create();
}
@@ -51,16 +51,17 @@ public class MaxRunningAppsEnforcer {
* maxRunningApps limits.
*/
public boolean canAppBeRunnable(FSQueue queue, String user) {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
Integer userNumRunnable = usersNumRunnableApps.get(user);
if (userNumRunnable == null) {
userNumRunnable = 0;
}
- if (userNumRunnable >= queueMgr.getUserMaxApps(user)) {
+ if (userNumRunnable >= allocConf.getUserMaxApps(user)) {
return false;
}
// Check queue and all parent queues
while (queue != null) {
- int queueMaxApps = queueMgr.getQueueMaxApps(queue.getName());
+ int queueMaxApps = allocConf.getQueueMaxApps(queue.getName());
if (queue.getNumRunnableApps() >= queueMaxApps) {
return false;
}
@@ -107,6 +108,8 @@ public class MaxRunningAppsEnforcer {
* highest queue that went from having no slack to having slack.
*/
public void updateRunnabilityOnAppRemoval(FSSchedulerApp app) {
+ AllocationConfiguration allocConf = scheduler.getAllocationConfiguration();
+
// Update usersRunnableApps
String user = app.getUser();
int newUserNumRunning = usersNumRunnableApps.get(user) - 1;
@@ -127,10 +130,10 @@ public class MaxRunningAppsEnforcer {
// that was at its maxRunningApps before the removal.
FSLeafQueue queue = app.getQueue();
FSQueue highestQueueWithAppsNowRunnable = (queue.getNumRunnableApps() ==
- queueMgr.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
+ allocConf.getQueueMaxApps(queue.getName()) - 1) ? queue : null;
FSParentQueue parent = queue.getParent();
while (parent != null) {
- if (parent.getNumRunnableApps() == queueMgr.getQueueMaxApps(parent
+ if (parent.getNumRunnableApps() == allocConf.getQueueMaxApps(parent
.getName())) {
highestQueueWithAppsNowRunnable = parent;
}
@@ -149,7 +152,7 @@ public class MaxRunningAppsEnforcer {
gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable,
appsNowMaybeRunnable);
}
- if (newUserNumRunning == queueMgr.getUserMaxApps(user) - 1) {
+ if (newUserNumRunning == allocConf.getUserMaxApps(user) - 1) {
List<AppSchedulable> userWaitingApps = usersNonRunnableApps.get(user);
if (userWaitingApps != null) {
appsNowMaybeRunnable.add(userWaitingApps);
@@ -200,7 +203,8 @@ public class MaxRunningAppsEnforcer {
*/
private void gatherPossiblyRunnableAppLists(FSQueue queue,
List<List<AppSchedulable>> appLists) {
- if (queue.getNumRunnableApps() < queueMgr.getQueueMaxApps(queue.getName())) {
+ if (queue.getNumRunnableApps() < scheduler.getAllocationConfiguration()
+ .getQueueMaxApps(queue.getName())) {
if (queue instanceof FSLeafQueue) {
appLists.add(((FSLeafQueue)queue).getNonRunnableAppSchedulables());
} else {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Dec 5 03:26:11 2013
@@ -18,20 +18,14 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import java.io.File;
import java.io.IOException;
-import java.net.URL;
-import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
-import javax.xml.parsers.DocumentBuilder;
-import javax.xml.parsers.DocumentBuilderFactory;
import javax.xml.parsers.ParserConfigurationException;
import org.apache.commons.logging.Log;
@@ -39,21 +33,9 @@ import org.apache.commons.logging.LogFac
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceWeights;
-import org.apache.hadoop.yarn.util.resource.Resources;
-import org.w3c.dom.Document;
-import org.w3c.dom.Element;
-import org.w3c.dom.Node;
-import org.w3c.dom.NodeList;
-import org.w3c.dom.Text;
import org.xml.sax.SAXException;
-import com.google.common.annotations.VisibleForTesting;
-
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
@@ -67,37 +49,13 @@ public class QueueManager {
public static final String ROOT_QUEUE = "root";
- /** Time to wait between checks of the allocation file */
- public static final long ALLOC_RELOAD_INTERVAL = 10 * 1000;
-
- /**
- * Time to wait after the allocation has been modified before reloading it
- * (this is done to prevent loading a file that hasn't been fully written).
- */
- public static final long ALLOC_RELOAD_WAIT = 5 * 1000;
-
- private static final AccessControlList EVERYBODY_ACL = new AccessControlList("*");
- private static final AccessControlList NOBODY_ACL = new AccessControlList(" ");
-
private final FairScheduler scheduler;
- // Path to XML file containing allocations.
- private File allocFile;
-
private final Collection<FSLeafQueue> leafQueues =
new CopyOnWriteArrayList<FSLeafQueue>();
private final Map<String, FSQueue> queues = new HashMap<String, FSQueue>();
private FSParentQueue rootQueue;
- @VisibleForTesting
- volatile QueueManagerInfo info = new QueueManagerInfo();
- @VisibleForTesting
- volatile QueuePlacementPolicy placementPolicy;
-
- private long lastReloadAttempt; // Last time we tried to reload the queues file
- private long lastSuccessfulReload; // Last time we successfully reloaded queues
- private boolean lastReloadAttemptFailed = false;
-
public QueueManager(FairScheduler scheduler) {
this.scheduler = scheduler;
}
@@ -106,45 +64,15 @@ public class QueueManager {
return rootQueue;
}
- public void initialize() throws IOException, SAXException,
- AllocationConfigurationException, ParserConfigurationException {
- FairSchedulerConfiguration conf = scheduler.getConf();
- rootQueue = new FSParentQueue("root", this, scheduler, null);
+ public void initialize(Configuration conf) throws IOException,
+ SAXException, AllocationConfigurationException, ParserConfigurationException {
+ rootQueue = new FSParentQueue("root", scheduler, null);
queues.put(rootQueue.getName(), rootQueue);
- this.allocFile = conf.getAllocationFile();
- placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
- new HashSet<String>(), conf);
-
- reloadAllocs();
- lastSuccessfulReload = scheduler.getClock().getTime();
- lastReloadAttempt = scheduler.getClock().getTime();
// Create the default queue
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}
- public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
-
- }
-
- /**
- * Construct simple queue placement policy from allow-undeclared-pools and
- * user-as-default-queue.
- */
- private List<QueuePlacementRule> getSimplePlacementRules() {
- boolean create = scheduler.getConf().getAllowUndeclaredPools();
- boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
- List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
- rules.add(new QueuePlacementRule.Specified().initialize(create, null));
- if (userAsDefaultQueue) {
- rules.add(new QueuePlacementRule.User().initialize(create, null));
- }
- if (!userAsDefaultQueue || !create) {
- rules.add(new QueuePlacementRule.Default().initialize(true, null));
- }
- return rules;
- }
-
/**
* Get a queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -213,17 +141,30 @@ public class QueueManager {
// queue to create.
// Now that we know everything worked out, make all the queues
// and add them to the map.
+ AllocationConfiguration queueConf = scheduler.getAllocationConfiguration();
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
if (i == 0) {
// First name added was the leaf queue
- leafQueue = new FSLeafQueue(name, this, scheduler, parent);
+ leafQueue = new FSLeafQueue(name, scheduler, parent);
+ try {
+ leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
+ } catch (AllocationConfigurationException ex) {
+ LOG.warn("Failed to set default scheduling policy "
+ + queueConf.getDefaultSchedulingPolicy() + " on new leaf queue.", ex);
+ }
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
} else {
- FSParentQueue newParent = new FSParentQueue(queueName, this, scheduler, parent);
+ FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
+ try {
+ newParent.setPolicy(queueConf.getDefaultSchedulingPolicy());
+ } catch (AllocationConfigurationException ex) {
+ LOG.warn("Failed to set default scheduling policy "
+ + queueConf.getDefaultSchedulingPolicy() + " on new parent queue.", ex);
+ }
parent.addChildQueue(newParent);
queues.put(newParent.getName(), newParent);
parent = newParent;
@@ -257,301 +198,6 @@ public class QueueManager {
}
}
- public QueuePlacementPolicy getPlacementPolicy() {
- return placementPolicy;
- }
-
- /**
- * Reload allocations file if it hasn't been loaded in a while
- */
- public void reloadAllocsIfNecessary() {
- long time = scheduler.getClock().getTime();
- if (time > lastReloadAttempt + ALLOC_RELOAD_INTERVAL) {
- lastReloadAttempt = time;
- if (null == allocFile) {
- return;
- }
- try {
- // Get last modified time of alloc file depending whether it's a String
- // (for a path name) or an URL (for a classloader resource)
- long lastModified = allocFile.lastModified();
- if (lastModified > lastSuccessfulReload &&
- time > lastModified + ALLOC_RELOAD_WAIT) {
- reloadAllocs();
- lastSuccessfulReload = time;
- lastReloadAttemptFailed = false;
- }
- } catch (Exception e) {
- // Throwing the error further out here won't help - the RPC thread
- // will catch it and report it in a loop. Instead, just log it and
- // hope somebody will notice from the log.
- // We log the error only on the first failure so we don't fill up the
- // JobTracker's log with these messages.
- if (!lastReloadAttemptFailed) {
- LOG.error("Failed to reload fair scheduler config file - " +
- "will use existing allocations.", e);
- }
- lastReloadAttemptFailed = true;
- }
- }
- }
-
- /**
- * Updates the allocation list from the allocation config file. This file is
- * expected to be in the XML format specified in the design doc.
- *
- * @throws IOException if the config file cannot be read.
- * @throws AllocationConfigurationException if allocations are invalid.
- * @throws ParserConfigurationException if XML parser is misconfigured.
- * @throws SAXException if config file is malformed.
- */
- public void reloadAllocs() throws IOException, ParserConfigurationException,
- SAXException, AllocationConfigurationException {
- if (allocFile == null) return;
- // Create some temporary hashmaps to hold the new allocs, and we only save
- // them in our fields if we have parsed the entire allocs file successfully.
- Map<String, Resource> minQueueResources = new HashMap<String, Resource>();
- Map<String, Resource> maxQueueResources = new HashMap<String, Resource>();
- Map<String, Integer> queueMaxApps = new HashMap<String, Integer>();
- Map<String, Integer> userMaxApps = new HashMap<String, Integer>();
- Map<String, ResourceWeights> queueWeights = new HashMap<String, ResourceWeights>();
- Map<String, SchedulingPolicy> queuePolicies = new HashMap<String, SchedulingPolicy>();
- Map<String, Long> minSharePreemptionTimeouts = new HashMap<String, Long>();
- Map<String, Map<QueueACL, AccessControlList>> queueAcls =
- new HashMap<String, Map<QueueACL, AccessControlList>>();
- int userMaxAppsDefault = Integer.MAX_VALUE;
- int queueMaxAppsDefault = Integer.MAX_VALUE;
- long fairSharePreemptionTimeout = Long.MAX_VALUE;
- long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
- SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
-
- QueuePlacementPolicy newPlacementPolicy = null;
-
- // Remember all queue names so we can display them on web UI, etc.
- List<String> queueNamesInAllocFile = new ArrayList<String>();
-
- // Read and parse the allocations file.
- DocumentBuilderFactory docBuilderFactory =
- DocumentBuilderFactory.newInstance();
- docBuilderFactory.setIgnoringComments(true);
- DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
- Document doc = builder.parse(allocFile);
- Element root = doc.getDocumentElement();
- if (!"allocations".equals(root.getTagName()))
- throw new AllocationConfigurationException("Bad fair scheduler config " +
- "file: top-level element not <allocations>");
- NodeList elements = root.getChildNodes();
- List<Element> queueElements = new ArrayList<Element>();
- Element placementPolicyElement = null;
- for (int i = 0; i < elements.getLength(); i++) {
- Node node = elements.item(i);
- if (node instanceof Element) {
- Element element = (Element)node;
- if ("queue".equals(element.getTagName()) ||
- "pool".equals(element.getTagName())) {
- queueElements.add(element);
- } else if ("user".equals(element.getTagName())) {
- String userName = element.getAttribute("name");
- NodeList fields = element.getChildNodes();
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element) fieldNode;
- if ("maxRunningApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- userMaxApps.put(userName, val);
- }
- }
- } else if ("userMaxAppsDefault".equals(element.getTagName())) {
- String text = ((Text)element.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- userMaxAppsDefault = val;
- } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) {
- String text = ((Text)element.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- fairSharePreemptionTimeout = val;
- } else if ("defaultMinSharePreemptionTimeout".equals(element.getTagName())) {
- String text = ((Text)element.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- defaultMinSharePreemptionTimeout = val;
- } else if ("queueMaxAppsDefault".equals(element.getTagName())) {
- String text = ((Text)element.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- queueMaxAppsDefault = val;
- } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName())
- || "defaultQueueSchedulingMode".equals(element.getTagName())) {
- String text = ((Text)element.getFirstChild()).getData().trim();
- SchedulingPolicy.setDefault(text);
- defaultSchedPolicy = SchedulingPolicy.getDefault();
- } else if ("queuePlacementPolicy".equals(element.getTagName())) {
- placementPolicyElement = element;
- } else {
- LOG.warn("Bad element in allocations file: " + element.getTagName());
- }
- }
- }
-
- // Load queue elements. A root queue can either be included or omitted. If
- // it's included, all other queues must be inside it.
- for (Element element : queueElements) {
- String parent = "root";
- if (element.getAttribute("name").equalsIgnoreCase("root")) {
- if (queueElements.size() > 1) {
- throw new AllocationConfigurationException("If configuring root queue,"
- + " no other queues can be placed alongside it.");
- }
- parent = null;
- }
- loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
- userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
- }
-
- // Load placement policy and pass it configured queues
- if (placementPolicyElement != null) {
- newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
- new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
- } else {
- newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
- new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
- }
-
- // Commit the reload; also create any queue defined in the alloc file
- // if it does not already exist, so it can be displayed on the web UI.
- synchronized (this) {
- info = new QueueManagerInfo(minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
- queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
- queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
- placementPolicy = newPlacementPolicy;
-
- // Make sure all queues exist
- for (String name: queueNamesInAllocFile) {
- getLeafQueue(name, true);
- }
-
- for (FSQueue queue : queues.values()) {
- // Update queue metrics
- FSQueueMetrics queueMetrics = queue.getMetrics();
- queueMetrics.setMinShare(queue.getMinShare());
- queueMetrics.setMaxShare(queue.getMaxShare());
- // Set scheduling policies
- if (queuePolicies.containsKey(queue.getName())) {
- queue.setPolicy(queuePolicies.get(queue.getName()));
- } else {
- queue.setPolicy(SchedulingPolicy.getDefault());
- }
- }
-
- }
- }
-
- /**
- * Loads a queue from a queue element in the configuration file
- */
- private void loadQueue(String parentName, Element element, Map<String, Resource> minQueueResources,
- Map<String, Resource> maxQueueResources, Map<String, Integer> queueMaxApps,
- Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
- Map<String, SchedulingPolicy> queuePolicies,
- Map<String, Long> minSharePreemptionTimeouts,
- Map<String, Map<QueueACL, AccessControlList>> queueAcls, List<String> queueNamesInAllocFile)
- throws AllocationConfigurationException {
- String queueName = element.getAttribute("name");
- if (parentName != null) {
- queueName = parentName + "." + queueName;
- }
- Map<QueueACL, AccessControlList> acls =
- new HashMap<QueueACL, AccessControlList>();
- NodeList fields = element.getChildNodes();
- boolean isLeaf = true;
-
- for (int j = 0; j < fields.getLength(); j++) {
- Node fieldNode = fields.item(j);
- if (!(fieldNode instanceof Element))
- continue;
- Element field = (Element) fieldNode;
- if ("minResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
- minQueueResources.put(queueName, val);
- } else if ("maxResources".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- Resource val = FairSchedulerConfiguration.parseResourceConfigValue(text);
- maxQueueResources.put(queueName, val);
- } else if ("maxRunningApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- int val = Integer.parseInt(text);
- queueMaxApps.put(queueName, val);
- } else if ("weight".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- double val = Double.parseDouble(text);
- queueWeights.put(queueName, new ResourceWeights((float)val));
- } else if ("minSharePreemptionTimeout".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- long val = Long.parseLong(text) * 1000L;
- minSharePreemptionTimeouts.put(queueName, val);
- } else if ("schedulingPolicy".equals(field.getTagName())
- || "schedulingMode".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData().trim();
- SchedulingPolicy policy = SchedulingPolicy.parse(text);
- policy.initialize(scheduler.getClusterCapacity());
- queuePolicies.put(queueName, policy);
- } else if ("aclSubmitApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData();
- acls.put(QueueACL.SUBMIT_APPLICATIONS, new AccessControlList(text));
- } else if ("aclAdministerApps".equals(field.getTagName())) {
- String text = ((Text)field.getFirstChild()).getData();
- acls.put(QueueACL.ADMINISTER_QUEUE, new AccessControlList(text));
- } else if ("queue".endsWith(field.getTagName()) ||
- "pool".equals(field.getTagName())) {
- loadQueue(queueName, field, minQueueResources, maxQueueResources,
- queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
- isLeaf = false;
- }
- }
- if (isLeaf) {
- queueNamesInAllocFile.add(queueName);
- }
- queueAcls.put(queueName, acls);
- if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
- && !Resources.fitsIn(minQueueResources.get(queueName),
- maxQueueResources.get(queueName))) {
- LOG.warn(String.format("Queue %s has max resources %d less than min resources %d",
- queueName, maxQueueResources.get(queueName), minQueueResources.get(queueName)));
- }
- }
-
- /**
- * Get the minimum resource allocation for the given queue.
- * @return the cap set on this queue, or 0 if not set.
- */
- public Resource getMinResources(String queue) {
- Resource minQueueResource = info.minQueueResources.get(queue);
- if (minQueueResource != null) {
- return minQueueResource;
- } else {
- return Resources.createResource(0);
- }
- }
-
- /**
- * Get the maximum resource allocation for the given queue.
- * @return the cap set on this queue, or Integer.MAX_VALUE if not set.
- */
-
- public Resource getMaxResources(String queueName) {
- Resource maxQueueResource = info.maxQueueResources.get(queueName);
- if (maxQueueResource != null) {
- return maxQueueResource;
- } else {
- return Resources.createResource(Integer.MAX_VALUE, Integer.MAX_VALUE);
- }
- }
-
/**
* Get a collection of all leaf queues
*/
@@ -567,141 +213,27 @@ public class QueueManager {
public Collection<FSQueue> getQueues() {
return queues.values();
}
-
- public int getUserMaxApps(String user) {
- // save current info in case it gets changed under us
- QueueManagerInfo info = this.info;
- if (info.userMaxApps.containsKey(user)) {
- return info.userMaxApps.get(user);
- } else {
- return info.userMaxAppsDefault;
- }
- }
-
- public int getQueueMaxApps(String queue) {
- // save current info in case it gets changed under us
- QueueManagerInfo info = this.info;
- if (info.queueMaxApps.containsKey(queue)) {
- return info.queueMaxApps.get(queue);
- } else {
- return info.queueMaxAppsDefault;
- }
- }
- public ResourceWeights getQueueWeight(String queue) {
- ResourceWeights weight = info.queueWeights.get(queue);
- if (weight != null) {
- return weight;
- } else {
- return ResourceWeights.NEUTRAL;
- }
- }
-
- /**
- * Get a queue's min share preemption timeout, in milliseconds. This is the
- * time after which jobs in the queue may kill other queues' tasks if they
- * are below their min share.
- */
- public long getMinSharePreemptionTimeout(String queueName) {
- // save current info in case it gets changed under us
- QueueManagerInfo info = this.info;
- if (info.minSharePreemptionTimeouts.containsKey(queueName)) {
- return info.minSharePreemptionTimeouts.get(queueName);
- }
- return info.defaultMinSharePreemptionTimeout;
- }
-
- /**
- * Get the fair share preemption, in milliseconds. This is the time
- * after which any job may kill other jobs' tasks if it is below half
- * its fair share.
- */
- public long getFairSharePreemptionTimeout() {
- return info.fairSharePreemptionTimeout;
- }
-
- /**
- * Get the ACLs associated with this queue. If a given ACL is not explicitly
- * configured, include the default value for that ACL. The default for the
- * root queue is everybody ("*") and the default for all other queues is
- * nobody ("")
- */
- public AccessControlList getQueueAcl(String queue, QueueACL operation) {
- Map<QueueACL, AccessControlList> queueAcls = info.queueAcls.get(queue);
- if (queueAcls == null || !queueAcls.containsKey(operation)) {
- return (queue.equals(ROOT_QUEUE)) ? EVERYBODY_ACL : NOBODY_ACL;
- }
- return queueAcls.get(operation);
- }
-
- static class QueueManagerInfo {
- // Minimum resource allocation for each queue
- public final Map<String, Resource> minQueueResources;
- // Maximum amount of resources per queue
- public final Map<String, Resource> maxQueueResources;
- // Sharing weights for each queue
- public final Map<String, ResourceWeights> queueWeights;
-
- // Max concurrent running applications for each queue and for each user; in addition,
- // for users that have no max specified, we use the userMaxJobsDefault.
- public final Map<String, Integer> queueMaxApps;
- public final Map<String, Integer> userMaxApps;
- public final int userMaxAppsDefault;
- public final int queueMaxAppsDefault;
-
- // ACL's for each queue. Only specifies non-default ACL's from configuration.
- public final Map<String, Map<QueueACL, AccessControlList>> queueAcls;
-
- // Min share preemption timeout for each queue in seconds. If a job in the queue
- // waits this long without receiving its guaranteed share, it is allowed to
- // preempt other jobs' tasks.
- public final Map<String, Long> minSharePreemptionTimeouts;
-
- // Default min share preemption timeout for queues where it is not set
- // explicitly.
- public final long defaultMinSharePreemptionTimeout;
-
- // Preemption timeout for jobs below fair share in seconds. If a job remains
- // below half its fair share for this long, it is allowed to preempt tasks.
- public final long fairSharePreemptionTimeout;
-
- public final SchedulingPolicy defaultSchedulingPolicy;
-
- public QueueManagerInfo(Map<String, Resource> minQueueResources,
- Map<String, Resource> maxQueueResources,
- Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
- Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault, SchedulingPolicy defaultSchedulingPolicy,
- Map<String, Long> minSharePreemptionTimeouts,
- Map<String, Map<QueueACL, AccessControlList>> queueAcls,
- long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout) {
- this.minQueueResources = minQueueResources;
- this.maxQueueResources = maxQueueResources;
- this.queueMaxApps = queueMaxApps;
- this.userMaxApps = userMaxApps;
- this.queueWeights = queueWeights;
- this.userMaxAppsDefault = userMaxAppsDefault;
- this.queueMaxAppsDefault = queueMaxAppsDefault;
- this.defaultSchedulingPolicy = defaultSchedulingPolicy;
- this.minSharePreemptionTimeouts = minSharePreemptionTimeouts;
- this.queueAcls = queueAcls;
- this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
- this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
+ public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
+ // Make sure all queues exist
+ for (String name : queueConf.getQueueNames()) {
+ getLeafQueue(name, true);
}
- public QueueManagerInfo() {
- minQueueResources = new HashMap<String, Resource>();
- maxQueueResources = new HashMap<String, Resource>();
- queueWeights = new HashMap<String, ResourceWeights>();
- queueMaxApps = new HashMap<String, Integer>();
- userMaxApps = new HashMap<String, Integer>();
- userMaxAppsDefault = Integer.MAX_VALUE;
- queueMaxAppsDefault = Integer.MAX_VALUE;
- queueAcls = new HashMap<String, Map<QueueACL, AccessControlList>>();
- minSharePreemptionTimeouts = new HashMap<String, Long>();
- defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
- fairSharePreemptionTimeout = Long.MAX_VALUE;
- defaultSchedulingPolicy = SchedulingPolicy.getDefault();
+ for (FSQueue queue : queues.values()) {
+ // Update queue metrics
+ FSQueueMetrics queueMetrics = queue.getMetrics();
+ queueMetrics.setMinShare(queue.getMinShare());
+ queueMetrics.setMaxShare(queue.getMaxShare());
+ // Set scheduling policies
+ try {
+ SchedulingPolicy policy = queueConf.getSchedulingPolicy(queue.getName());
+ policy.initialize(scheduler.getClusterCapacity());
+ queue.setPolicy(policy);
+ } catch (AllocationConfigurationException ex) {
+ LOG.warn("Cannot apply configured scheduling policy to queue "
+ + queue.getName(), ex);
+ }
}
}
}
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1548006&r1=1548005&r2=1548006&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Thu Dec 5 03:26:11 2013
@@ -95,6 +95,34 @@ public class QueuePlacementPolicy {
}
/**
+ * Build a simple queue placement policy from the allow-undeclared-pools and
+ * user-as-default-queue configuration options.
+ */
+ public static QueuePlacementPolicy fromConfiguration(Configuration conf,
+ Set<String> configuredQueues) {
+ boolean create = conf.getBoolean(
+ FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
+ FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
+ boolean userAsDefaultQueue = conf.getBoolean(
+ FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
+ FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE);
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+ if (userAsDefaultQueue) {
+ rules.add(new QueuePlacementRule.User().initialize(create, null));
+ }
+ if (!userAsDefaultQueue || !create) {
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ }
+ try {
+ return new QueuePlacementPolicy(rules, configuredQueues, conf);
+ } catch (AllocationConfigurationException ex) {
+ throw new RuntimeException("Should never hit exception when loading" +
+ "placement policy from conf", ex);
+ }
+ }
+
+ /**
* Applies this rule to an app with the given requested queue and user/group
* information.
*
@@ -120,4 +148,8 @@ public class QueuePlacementPolicy {
throw new IllegalStateException("Should have applied a rule before " +
"reaching here");
}
+
+ public List<QueuePlacementRule> getRules() {
+ return rules;
+ }
}