You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2013/11/14 23:12:13 UTC
svn commit: r1542105 - in /hadoop/common/trunk/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-server/hadoop-y...
Author: sandy
Date: Thu Nov 14 22:12:13 2013
New Revision: 1542105
URL: http://svn.apache.org/r1542105
Log:
YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair Scheduler (Sandy Ryza)
Modified:
hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt?rev=1542105&r1=1542104&r2=1542105&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/trunk/hadoop-yarn-project/CHANGES.txt Thu Nov 14 22:12:13 2013
@@ -40,6 +40,9 @@ Release 2.3.0 - UNRELEASED
YARN-311. RM/scheduler support for dynamic resource configuration.
(Junping Du via llu)
+ YARN-1392. Allow sophisticated app-to-queue placement policies in the Fair
+ Scheduler (Sandy Ryza)
+
IMPROVEMENTS
YARN-7. Support CPU resource for DistributedShell. (Junping Du via llu)
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java?rev=1542105&r1=1542104&r2=1542105&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java Thu Nov 14 22:12:13 2013
@@ -136,9 +136,6 @@ public class FairScheduler implements Re
// How often fair shares are re-calculated (ms)
protected long UPDATE_INTERVAL = 500;
- // Whether to use username in place of "default" queue name
- private volatile boolean userAsDefaultQueue = false;
-
private final static List<Container> EMPTY_CONTAINER_LIST =
new ArrayList<Container>();
@@ -640,6 +637,12 @@ public class FairScheduler implements Re
RMApp rmApp = rmContext.getRMApps().get(
applicationAttemptId.getApplicationId());
FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
+ if (queue == null) {
+ rmContext.getDispatcher().getEventHandler().handle(
+ new RMAppAttemptRejectedEvent(applicationAttemptId,
+ "Application rejected by queue placement policy"));
+ return;
+ }
FSSchedulerApp schedulerApp =
new FSSchedulerApp(applicationAttemptId, user,
@@ -675,17 +678,16 @@ public class FairScheduler implements Re
@VisibleForTesting
FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) {
- // Potentially set queue to username if configured to do so
- if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
- userAsDefaultQueue) {
- queueName = user;
- }
-
- FSLeafQueue queue = queueMgr.getLeafQueue(queueName,
- conf.getAllowUndeclaredPools());
- if (queue == null) {
- // queue is not an existing or createable leaf queue
- queue = queueMgr.getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, false);
+ FSLeafQueue queue = null;
+ try {
+ QueuePlacementPolicy policy = queueMgr.getPlacementPolicy();
+ queueName = policy.assignAppToQueue(queueName, user);
+ if (queueName == null) {
+ return null;
+ }
+ queue = queueMgr.getLeafQueue(queueName, true);
+ } catch (IOException ex) {
+ LOG.error("Error assigning app to queue, rejecting", ex);
}
if (rmApp != null) {
@@ -1155,7 +1157,6 @@ public class FairScheduler implements Re
minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
- userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
continuousSchedulingEnabled = this.conf.isContinuousSchedulingEnabled();
continuousSchedulingSleepMs =
this.conf.getContinuousSchedulingSleepMs();
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1542105&r1=1542104&r2=1542105&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu Nov 14 22:12:13 2013
@@ -25,6 +25,7 @@ import java.net.URLConnection;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -51,6 +52,8 @@ import org.w3c.dom.NodeList;
import org.w3c.dom.Text;
import org.xml.sax.SAXException;
+import com.google.common.annotations.VisibleForTesting;
+
/**
* Maintains a list of queues as well as scheduling parameters for each queue,
* such as guaranteed share allocations, from the fair scheduler config file.
@@ -87,6 +90,8 @@ public class QueueManager {
private FSParentQueue rootQueue;
private volatile QueueManagerInfo info = new QueueManagerInfo();
+ @VisibleForTesting
+ volatile QueuePlacementPolicy placementPolicy;
private long lastReloadAttempt; // Last time we tried to reload the queues file
private long lastSuccessfulReload; // Last time we successfully reloaded queues
@@ -107,6 +112,8 @@ public class QueueManager {
queues.put(rootQueue.getName(), rootQueue);
this.allocFile = conf.getAllocationFile();
+ placementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+ new HashSet<String>(), conf);
reloadAllocs();
lastSuccessfulReload = scheduler.getClock().getTime();
@@ -115,6 +122,28 @@ public class QueueManager {
getLeafQueue(YarnConfiguration.DEFAULT_QUEUE_NAME, true);
}
+ public void updatePlacementPolicy(FairSchedulerConfiguration conf) {
+
+ }
+
+ /**
+ * Construct simple queue placement policy from allow-undeclared-pools and
+ * user-as-default-queue.
+ */
+ private List<QueuePlacementRule> getSimplePlacementRules() {
+ boolean create = scheduler.getConf().getAllowUndeclaredPools();
+ boolean userAsDefaultQueue = scheduler.getConf().getUserAsDefaultQueue();
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(create, null));
+ if (userAsDefaultQueue) {
+ rules.add(new QueuePlacementRule.User().initialize(create, null));
+ }
+ if (!userAsDefaultQueue || !create) {
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ }
+ return rules;
+ }
+
/**
* Get a queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
@@ -226,6 +255,10 @@ public class QueueManager {
return queues.containsKey(name);
}
}
+
+ public QueuePlacementPolicy getPlacementPolicy() {
+ return placementPolicy;
+ }
/**
* Reload allocations file if it hasn't been loaded in a while
@@ -290,6 +323,8 @@ public class QueueManager {
long fairSharePreemptionTimeout = Long.MAX_VALUE;
long defaultMinSharePreemptionTimeout = Long.MAX_VALUE;
SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.getDefault();
+
+ QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
List<String> queueNamesInAllocFile = new ArrayList<String>();
@@ -306,6 +341,7 @@ public class QueueManager {
"file: top-level element not <allocations>");
NodeList elements = root.getChildNodes();
List<Element> queueElements = new ArrayList<Element>();
+ Element placementPolicyElement = null;
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
@@ -348,6 +384,8 @@ public class QueueManager {
String text = ((Text)element.getFirstChild()).getData().trim();
SchedulingPolicy.setDefault(text);
defaultSchedPolicy = SchedulingPolicy.getDefault();
+ } else if ("queuePlacementPolicy".equals(element.getTagName())) {
+ placementPolicyElement = element;
} else {
LOG.warn("Bad element in allocations file: " + element.getTagName());
}
@@ -369,6 +407,15 @@ public class QueueManager {
userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
queueAcls, queueNamesInAllocFile);
}
+
+ // Load placement policy and pass it configured queues
+ if (placementPolicyElement != null) {
+ newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
+ new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+ } else {
+ newPlacementPolicy = new QueuePlacementPolicy(getSimplePlacementRules(),
+ new HashSet<String>(queueNamesInAllocFile), scheduler.getConf());
+ }
// Commit the reload; also create any queue defined in the alloc file
// if it does not already exist, so it can be displayed on the web UI.
@@ -377,6 +424,7 @@ public class QueueManager {
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout);
+ placementPolicy = newPlacementPolicy;
// Make sure all queues exist
for (String name: queueNamesInAllocFile) {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1542105&r1=1542104&r2=1542105&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu Nov 14 22:12:13 2013
@@ -44,7 +44,9 @@ import javax.xml.parsers.ParserConfigura
import junit.framework.Assert;
import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
+import org.apache.hadoop.security.GroupMappingServiceProvider;
import org.apache.hadoop.yarn.MockApps;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
@@ -94,6 +96,8 @@ import org.junit.Before;
import org.junit.Test;
import org.xml.sax.SAXException;
+import com.google.common.collect.Sets;
+
public class TestFairScheduler {
private class MockClock implements Clock {
@@ -616,6 +620,7 @@ public class TestFairScheduler {
conf.set(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, "false");
scheduler.reinitialize(conf, resourceManager.getRMContext());
+ scheduler.getQueueManager().initialize();
AppAddedSchedulerEvent appAddedEvent2 = new AppAddedSchedulerEvent(
createAppAttemptId(2, 1), "default", "user2");
scheduler.handle(appAddedEvent2);
@@ -664,6 +669,46 @@ public class TestFairScheduler {
assertEquals(rmApp2.getQueue(), queue2.getName());
assertEquals("root.notdefault", rmApp2.getQueue());
}
+
+ @Test
+ public void testQueuePlacementWithPolicy() throws Exception {
+ Configuration conf = createConfiguration();
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ ApplicationAttemptId appId;
+ Map<ApplicationAttemptId, FSSchedulerApp> apps = scheduler.applications;
+
+ List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+ rules.add(new QueuePlacementRule.User().initialize(false, null));
+ rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ Set<String> queues = Sets.newHashSet("root.user1", "root.user3group");
+ scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+ rules, queues, conf);
+ appId = createSchedulingRequest(1024, "somequeue", "user1");
+ assertEquals("root.somequeue", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "user1");
+ assertEquals("root.user1", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "user3");
+ assertEquals("root.user3group", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "otheruser");
+ assertEquals("root.default", apps.get(appId).getQueueName());
+
+ // test without specified as first rule
+ rules = new ArrayList<QueuePlacementRule>();
+ rules.add(new QueuePlacementRule.User().initialize(false, null));
+ rules.add(new QueuePlacementRule.Specified().initialize(true, null));
+ rules.add(new QueuePlacementRule.Default().initialize(true, null));
+ scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
+ rules, queues, conf);
+ appId = createSchedulingRequest(1024, "somequeue", "user1");
+ assertEquals("root.user1", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "somequeue", "otheruser");
+ assertEquals("root.somequeue", apps.get(appId).getQueueName());
+ appId = createSchedulingRequest(1024, "default", "otheruser");
+ assertEquals("root.default", apps.get(appId).getQueueName());
+ }
@Test
public void testFairShareWithMinAlloc() throws Exception {
Modified: hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1542105&r1=1542104&r2=1542105&view=diff
==============================================================================
--- hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Thu Nov 14 22:12:13 2013
@@ -101,6 +101,16 @@ Hadoop MapReduce Next Generation - Fair
Fair Scheduler. Among them, is the use of a custom policies governing
priority âboostingâ over certain apps.
+* {Automatically placing applications in queues}
+
+ The Fair Scheduler allows administrators to configure policies that
+ automatically place submitted applications into appropriate queues. Placement
+ can depend on the user and groups of the submitter and the requested queue
+ passed by the application. A policy consists of a set of rules that are applied
+ sequentially to classify an incoming application. Each rule either places the
+ app into a queue, rejects it, or continues on to the next rule. Refer to the
+ allocation file format below for how to configure these policies.
+
* {Installation}
To use the Fair Scheduler first assign the appropriate scheduler class in
@@ -138,7 +148,8 @@ Properties that can be placed in yarn-si
* Whether to use the username associated with the allocation as the default
queue name, in the event that a queue name is not specified. If this is set
to "false" or unset, all jobs have a shared default queue, named "default".
- Defaults to true.
+ Defaults to true. If a queue placement policy is given in the allocations
+ file, this property is ignored.
* <<<yarn.scheduler.fair.preemption>>>
@@ -180,6 +191,16 @@ Properties that can be placed in yarn-si
opportunities to pass up. The default value of -1.0 means don't pass up any
scheduling opportunities.
+ * <<<yarn.scheduler.fair.allow-undeclared-pools>>>
+
+ * If this is true, new queues can be created at application submission time,
+ whether because they are specified as the application's queue by the
+ submitter or because they are placed there by the user-as-default-queue
+ property. If this is false, any time an app would be placed in a queue that
+ is not specified in the allocations file, it is placed in the "default" queue
+ instead. Defaults to true. If a queue placement policy is given in the
+ allocations file, this property is ignored.
+
Allocation file format
The allocation file must be in XML format. The format contains five types of
@@ -248,25 +269,29 @@ Allocation file format
policy for queues; overriden by the schedulingPolicy element in each queue
if specified. Defaults to "fair".
- An example allocation file is given here:
+ * <<A queuePlacementPolicy element>>, which contains a list of rule elements
+ that tell the scheduler how to place incoming apps into queues. Rules
+ are applied in the order that they are listed. Rules may take arguments. All
+ rules accept the "create" argument, which indicates whether the rule can create
+ a new queue. "Create" defaults to true; if set to false and the rule would
+ place the app in a queue that is not configured in the allocations file, we
+ continue on to the next rule. The last rule must be one that can never issue a
+ continue. Valid rules are:
+
+ * specified: the app is placed into the queue it requested. If the app
+ requested no queue, i.e. it specified "default", we continue.
+
+ * user: the app is placed into a queue with the name of the user who
+ submitted it.
-Queue Access Control Lists (ACLs)
+ * primaryGroup: the app is placed into a queue with the name of the
+ primary group of the user who submitted it.
- Queue Access Control Lists (ACLs) allow administrators to control who may
- take actions on particular queues. They are configured with the aclSubmitApps
- and aclAdministerApps properties, which can be set per queue. Currently the
- only supported administrative action is killing an application. Anybody who
- may administer a queue may also submit applications to it. These properties
- take values in a format like "user1,user2 group1,group2" or " group1,group2".
- An action on a queue will be permitted if its user or group is in the ACL of
- that queue or in the ACL of any of that queue's ancestors. So if queue2
- is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
- ACL, then both users may submit to queue2.
-
- The root queue's ACLs are "*" by default which, because ACLs are passed down,
- means that everybody may submit to and kill applications from every queue.
- To start restricting access, change the root queue's ACLs to something other
- than "*".
+ * default: the app is placed into the queue named "default".
+
+ * reject: the app is rejected.
+
+ An example allocation file is given here:
---
<?xml version="1.0"?>
@@ -282,14 +307,41 @@ Queue Access Control Lists (ACLs)
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
+
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>5</userMaxAppsDefault>
+
+ <queuePlacementPolicy>
+ <specified />
+ <primarygroup create="false" />
+ <default />
+ </queuePlacementPolicy>
</allocations>
---
Note that for backwards compatibility with the original FairScheduler, "queue" elements can instead be named as "pool" elements.
+
+
+Queue Access Control Lists (ACLs)
+
+ Queue Access Control Lists (ACLs) allow administrators to control who may
+ take actions on particular queues. They are configured with the aclSubmitApps
+ and aclAdministerApps properties, which can be set per queue. Currently the
+ only supported administrative action is killing an application. Anybody who
+ may administer a queue may also submit applications to it. These properties
+ take values in a format like "user1,user2 group1,group2" or " group1,group2".
+ An action on a queue will be permitted if its user or group is in the ACL of
+ that queue or in the ACL of any of that queue's ancestors. So if queue2
+ is inside queue1, and user1 is in queue1's ACL, and user2 is in queue2's
+ ACL, then both users may submit to queue2.
+
+ The root queue's ACLs are "*" by default which, because ACLs are passed down,
+ means that everybody may submit to and kill applications from every queue.
+ To start restricting access, change the root queue's ACLs to something other
+ than "*".
+
* {Administration}