You are viewing a plain text version of this content. The canonical link for it is here.
Posted to yarn-commits@hadoop.apache.org by sa...@apache.org on 2014/05/08 09:23:19 UTC
svn commit: r1593192 - in
/hadoop/common/branches/branch-2/hadoop-yarn-project: ./
hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/
hadoop-yarn/hadoop-yarn-ser...
Author: sandy
Date: Thu May 8 07:23:19 2014
New Revision: 1593192
URL: http://svn.apache.org/r1593192
Log:
YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar via Sandy Ryza)
Added:
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
- copied unchanged from r1593191, hadoop/common/trunk/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSQueueType.java
Modified:
hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/CHANGES.txt Thu May 8 07:23:19 2014
@@ -8,6 +8,9 @@ Release 2.5.0 - UNRELEASED
YARN-1757. NM Recovery. Auxiliary service support. (Jason Lowe via kasha)
+ YARN-1864. Fair Scheduler Dynamic Hierarchical User Queues (Ashwin Shankar
+ via Sandy Ryza)
+
IMPROVEMENTS
YARN-1479. Invalid NaN values in Hadoop REST API JSON response (Chen He via
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java Thu May 8 07:23:19 2014
@@ -77,19 +77,22 @@ public class AllocationConfiguration {
@VisibleForTesting
QueuePlacementPolicy placementPolicy;
+ //Configured queues in the alloc xml
@VisibleForTesting
- Set<String> queueNames;
+ Map<FSQueueType, Set<String>> configuredQueues;
- public AllocationConfiguration(Map<String, Resource> minQueueResources,
- Map<String, Resource> maxQueueResources,
+ public AllocationConfiguration(Map<String, Resource> minQueueResources,
+ Map<String, Resource> maxQueueResources,
Map<String, Integer> queueMaxApps, Map<String, Integer> userMaxApps,
Map<String, ResourceWeights> queueWeights, int userMaxAppsDefault,
- int queueMaxAppsDefault, Map<String, SchedulingPolicy> schedulingPolicies,
+ int queueMaxAppsDefault,
+ Map<String, SchedulingPolicy> schedulingPolicies,
SchedulingPolicy defaultSchedulingPolicy,
- Map<String, Long> minSharePreemptionTimeouts,
+ Map<String, Long> minSharePreemptionTimeouts,
Map<String, Map<QueueACL, AccessControlList>> queueAcls,
long fairSharePreemptionTimeout, long defaultMinSharePreemptionTimeout,
- QueuePlacementPolicy placementPolicy, Set<String> queueNames) {
+ QueuePlacementPolicy placementPolicy,
+ Map<FSQueueType, Set<String>> configuredQueues) {
this.minQueueResources = minQueueResources;
this.maxQueueResources = maxQueueResources;
this.queueMaxApps = queueMaxApps;
@@ -104,7 +107,7 @@ public class AllocationConfiguration {
this.fairSharePreemptionTimeout = fairSharePreemptionTimeout;
this.defaultMinSharePreemptionTimeout = defaultMinSharePreemptionTimeout;
this.placementPolicy = placementPolicy;
- this.queueNames = queueNames;
+ this.configuredQueues = configuredQueues;
}
public AllocationConfiguration(Configuration conf) {
@@ -121,9 +124,12 @@ public class AllocationConfiguration {
fairSharePreemptionTimeout = Long.MAX_VALUE;
schedulingPolicies = new HashMap<String, SchedulingPolicy>();
defaultSchedulingPolicy = SchedulingPolicy.DEFAULT_POLICY;
+ configuredQueues = new HashMap<FSQueueType, Set<String>>();
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
placementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
- new HashSet<String>());
- queueNames = new HashSet<String>();
+ configuredQueues);
}
/**
@@ -221,8 +227,8 @@ public class AllocationConfiguration {
return defaultSchedulingPolicy;
}
- public Set<String> getQueueNames() {
- return queueNames;
+ public Map<FSQueueType, Set<String>> getConfiguredQueues() {
+ return configuredQueues;
}
public QueuePlacementPolicy getPlacementPolicy() {
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java Thu May 8 07:23:19 2014
@@ -214,8 +214,15 @@ public class AllocationFileLoaderService
QueuePlacementPolicy newPlacementPolicy = null;
// Remember all queue names so we can display them on web UI, etc.
- Set<String> queueNamesInAllocFile = new HashSet<String>();
-
+ // configuredQueues is segregated based on whether it is a leaf queue
+ // or a parent queue. This information is used for creating queues
+ // and also for making queue placement decisions(QueuePlacementRule.java).
+ Map<FSQueueType, Set<String>> configuredQueues =
+ new HashMap<FSQueueType, Set<String>>();
+ for (FSQueueType queueType : FSQueueType.values()) {
+ configuredQueues.put(queueType, new HashSet<String>());
+ }
+
// Read and parse the allocations file.
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
@@ -289,26 +296,27 @@ public class AllocationFileLoaderService
}
parent = null;
}
- loadQueue(parent, element, minQueueResources, maxQueueResources, queueMaxApps,
- userMaxApps, queueWeights, queuePolicies, minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
+ loadQueue(parent, element, minQueueResources, maxQueueResources,
+ queueMaxApps, userMaxApps, queueWeights, queuePolicies,
+ minSharePreemptionTimeouts, queueAcls,
+ configuredQueues);
}
// Load placement policy and pass it configured queues
Configuration conf = getConfig();
if (placementPolicyElement != null) {
newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement,
- queueNamesInAllocFile, conf);
+ configuredQueues, conf);
} else {
newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf,
- queueNamesInAllocFile);
+ configuredQueues);
}
AllocationConfiguration info = new AllocationConfiguration(minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, userMaxAppsDefault,
queueMaxAppsDefault, queuePolicies, defaultSchedPolicy, minSharePreemptionTimeouts,
queueAcls, fairSharePreemptionTimeout, defaultMinSharePreemptionTimeout,
- newPlacementPolicy, queueNamesInAllocFile);
+ newPlacementPolicy, configuredQueues);
lastSuccessfulReload = clock.getTime();
lastReloadAttemptFailed = false;
@@ -324,7 +332,8 @@ public class AllocationFileLoaderService
Map<String, Integer> userMaxApps, Map<String, ResourceWeights> queueWeights,
Map<String, SchedulingPolicy> queuePolicies,
Map<String, Long> minSharePreemptionTimeouts,
- Map<String, Map<QueueACL, AccessControlList>> queueAcls, Set<String> queueNamesInAllocFile)
+ Map<String, Map<QueueACL, AccessControlList>> queueAcls,
+ Map<FSQueueType, Set<String>> configuredQueues)
throws AllocationConfigurationException {
String queueName = element.getAttribute("name");
if (parentName != null) {
@@ -375,13 +384,19 @@ public class AllocationFileLoaderService
"pool".equals(field.getTagName())) {
loadQueue(queueName, field, minQueueResources, maxQueueResources,
queueMaxApps, userMaxApps, queueWeights, queuePolicies,
- minSharePreemptionTimeouts,
- queueAcls, queueNamesInAllocFile);
+ minSharePreemptionTimeouts, queueAcls, configuredQueues);
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
isLeaf = false;
}
}
if (isLeaf) {
- queueNamesInAllocFile.add(queueName);
+ // if a leaf in the alloc file is marked as type='parent'
+ // then store it under 'parent'
+ if ("parent".equals(element.getAttribute("type"))) {
+ configuredQueues.get(FSQueueType.PARENT).add(queueName);
+ } else {
+ configuredQueues.get(FSQueueType.LEAF).add(queueName);
+ }
}
queueAcls.put(queueName, acls);
if (maxQueueResources.containsKey(queueName) && minQueueResources.containsKey(queueName)
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java Thu May 8 07:23:19 2014
@@ -74,7 +74,7 @@ public class QueueManager {
}
/**
- * Get a queue by name, creating it if the create param is true and is necessary.
+ * Get a leaf queue by name, creating it if the create param is true and is necessary.
* If the queue is not or can not be a leaf queue, i.e. it already exists as a
* parent queue, or one of the parents in its name is already a leaf queue,
* null is returned.
@@ -85,31 +85,53 @@ public class QueueManager {
* could be referred to as just "parent1.queue2".
*/
public FSLeafQueue getLeafQueue(String name, boolean create) {
+ FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
+ if (queue instanceof FSParentQueue) {
+ return null;
+ }
+ return (FSLeafQueue) queue;
+ }
+
+ /**
+ * Get a parent queue by name, creating it if the create param is true and is necessary.
+ * If the queue is not or can not be a parent queue, i.e. it already exists as a
+ * leaf queue, or one of the parents in its name is already a leaf queue,
+ * null is returned.
+ *
+ * The root part of the name is optional, so a queue underneath the root
+ * named "queue1" could be referred to as just "queue1", and a queue named
+ * "queue2" underneath a parent named "parent1" that is underneath the root
+ * could be referred to as just "parent1.queue2".
+ */
+ public FSParentQueue getParentQueue(String name, boolean create) {
+ FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
+ if (queue instanceof FSLeafQueue) {
+ return null;
+ }
+ return (FSParentQueue) queue;
+ }
+
+ private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
name = ensureRootPrefix(name);
synchronized (queues) {
FSQueue queue = queues.get(name);
if (queue == null && create) {
- FSLeafQueue leafQueue = createLeafQueue(name);
- if (leafQueue == null) {
- return null;
- }
- queue = leafQueue;
- } else if (queue instanceof FSParentQueue) {
- return null;
+ // if the queue doesn't exist,create it and return
+ queue = createQueue(name, queueType);
}
- return (FSLeafQueue)queue;
+ return queue;
}
}
/**
- * Creates a leaf queue and places it in the tree. Creates any
- * parents that don't already exist.
+ * Creates a leaf or parent queue based on what is specified in 'queueType'
+ * and places it in the tree. Creates any parents that don't already exist.
*
* @return
* the created queue, if successful. null if not allowed (one of the parent
* queues in the queue name is already a leaf queue)
*/
- private FSLeafQueue createLeafQueue(String name) {
+ private FSQueue createQueue(String name, FSQueueType queueType) {
List<String> newQueueNames = new ArrayList<String>();
newQueueNames.add(name);
int sepIndex = name.length();
@@ -143,8 +165,7 @@ public class QueueManager {
FSLeafQueue leafQueue = null;
for (int i = newQueueNames.size()-1; i >= 0; i--) {
String queueName = newQueueNames.get(i);
- if (i == 0) {
- // First name added was the leaf queue
+ if (i == 0 && queueType != FSQueueType.PARENT) {
leafQueue = new FSLeafQueue(name, scheduler, parent);
try {
leafQueue.setPolicy(queueConf.getDefaultSchedulingPolicy());
@@ -155,6 +176,7 @@ public class QueueManager {
parent.addChildQueue(leafQueue);
queues.put(leafQueue.getName(), leafQueue);
leafQueues.add(leafQueue);
+ return leafQueue;
} else {
FSParentQueue newParent = new FSParentQueue(queueName, scheduler, parent);
try {
@@ -169,53 +191,64 @@ public class QueueManager {
}
}
- return leafQueue;
+ return parent;
}
/**
- * Make way for the given leaf queue if possible, by removing incompatible
+ * Make way for the given queue if possible, by removing incompatible
* queues with no apps in them. Incompatibility could be due to
- * (1) leafToCreate being currently being a parent, or (2) an existing leaf queue in
- * the ancestry of leafToCreate.
+ * (1) queueToCreate being currently a parent but needs to change to leaf
+ * (2) queueToCreate being currently a leaf but needs to change to parent
+ * (3) an existing leaf queue in the ancestry of queueToCreate.
*
* We will never remove the root queue or the default queue in this way.
*
- * @return true if we can create leafToCreate or it already exists.
+ * @return true if we can create queueToCreate or it already exists.
*/
- private boolean removeEmptyIncompatibleQueues(String leafToCreate) {
- leafToCreate = ensureRootPrefix(leafToCreate);
+ private boolean removeEmptyIncompatibleQueues(String queueToCreate,
+ FSQueueType queueType) {
+ queueToCreate = ensureRootPrefix(queueToCreate);
- // Ensure leafToCreate is not root and doesn't have the default queue in its
+ // Ensure queueToCreate is not root and doesn't have the default queue in its
// ancestry.
- if (leafToCreate.equals(ROOT_QUEUE) ||
- leafToCreate.startsWith(
+ if (queueToCreate.equals(ROOT_QUEUE) ||
+ queueToCreate.startsWith(
ROOT_QUEUE + "." + YarnConfiguration.DEFAULT_QUEUE_NAME + ".")) {
return false;
}
- FSQueue queue = queues.get(leafToCreate);
+ FSQueue queue = queues.get(queueToCreate);
// Queue exists already.
if (queue != null) {
if (queue instanceof FSLeafQueue) {
- // If it's an already existing leaf, we're ok.
- return true;
+ if (queueType == FSQueueType.LEAF) {
+ // if queue is already a leaf then return true
+ return true;
+ }
+ // remove incompatibility since queue is a leaf currently
+ // needs to change to a parent.
+ return removeQueueIfEmpty(queue);
} else {
- // If it's an existing parent queue, remove it if it's empty.
+ if (queueType == FSQueueType.PARENT) {
+ return true;
+ }
+ // If it's an existing parent queue and needs to change to leaf,
+ // remove it if it's empty.
return removeQueueIfEmpty(queue);
}
}
// Queue doesn't exist already. Check if the new queue would be created
// under an existing leaf queue. If so, try removing that leaf queue.
- int sepIndex = leafToCreate.length();
- sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ int sepIndex = queueToCreate.length();
+ sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
while (sepIndex != -1) {
- String prefixString = leafToCreate.substring(0, sepIndex);
+ String prefixString = queueToCreate.substring(0, sepIndex);
FSQueue prefixQueue = queues.get(prefixString);
if (prefixQueue != null && prefixQueue instanceof FSLeafQueue) {
return removeQueueIfEmpty(prefixQueue);
}
- sepIndex = leafToCreate.lastIndexOf('.', sepIndex-1);
+ sepIndex = queueToCreate.lastIndexOf('.', sepIndex-1);
}
return true;
}
@@ -312,12 +345,21 @@ public class QueueManager {
}
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
- // Make sure all queues exist
- for (String name : queueConf.getQueueNames()) {
- if (removeEmptyIncompatibleQueues(name)) {
+ // Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
+ for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
+ if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
getLeafQueue(name, true);
}
}
+
+ // At this point all leaves and 'parents with at least one child' would have been created.
+ // Now create parents with no configured leaf.
+ for (String name : queueConf.getConfiguredQueues().get(
+ FSQueueType.PARENT)) {
+ if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
+ getParentQueue(name, true);
+ }
+ }
for (FSQueue queue : queues.values()) {
// Update queue metrics
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java Thu May 8 07:23:19 2014
@@ -42,17 +42,19 @@ public class QueuePlacementPolicy {
map.put("secondaryGroupExistingQueue",
QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class);
+ map.put("nestedUserQueue",
+ QueuePlacementRule.NestedUserQueue.class);
map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class);
ruleClasses = Collections.unmodifiableMap(map);
}
private final List<QueuePlacementRule> rules;
- private final Set<String> configuredQueues;
+ private final Map<FSQueueType, Set<String>> configuredQueues;
private final Groups groups;
public QueuePlacementPolicy(List<QueuePlacementRule> rules,
- Set<String> configuredQueues, Configuration conf)
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
throws AllocationConfigurationException {
for (int i = 0; i < rules.size()-1; i++) {
if (rules.get(i).isTerminal()) {
@@ -72,28 +74,15 @@ public class QueuePlacementPolicy {
/**
* Builds a QueuePlacementPolicy from an xml element.
*/
- public static QueuePlacementPolicy fromXml(Element el, Set<String> configuredQueues,
- Configuration conf) throws AllocationConfigurationException {
+ public static QueuePlacementPolicy fromXml(Element el,
+ Map<FSQueueType, Set<String>> configuredQueues, Configuration conf)
+ throws AllocationConfigurationException {
List<QueuePlacementRule> rules = new ArrayList<QueuePlacementRule>();
NodeList elements = el.getChildNodes();
for (int i = 0; i < elements.getLength(); i++) {
Node node = elements.item(i);
if (node instanceof Element) {
- Element element = (Element)node;
-
- String ruleName = element.getAttribute("name");
- if ("".equals(ruleName)) {
- throw new AllocationConfigurationException("No name provided for a " +
- "rule element");
- }
-
- Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
- if (clazz == null) {
- throw new AllocationConfigurationException("No rule class found for "
- + ruleName);
- }
- QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
- rule.initializeFromXml(element);
+ QueuePlacementRule rule = createAndInitializeRule(node);
rules.add(rule);
}
}
@@ -101,11 +90,37 @@ public class QueuePlacementPolicy {
}
/**
+ * Create and initialize a rule given a xml node
+ * @param node
+ * @return QueuePlacementPolicy
+ * @throws AllocationConfigurationException
+ */
+ public static QueuePlacementRule createAndInitializeRule(Node node)
+ throws AllocationConfigurationException {
+ Element element = (Element) node;
+
+ String ruleName = element.getAttribute("name");
+ if ("".equals(ruleName)) {
+ throw new AllocationConfigurationException("No name provided for a "
+ + "rule element");
+ }
+
+ Class<? extends QueuePlacementRule> clazz = ruleClasses.get(ruleName);
+ if (clazz == null) {
+ throw new AllocationConfigurationException("No rule class found for "
+ + ruleName);
+ }
+ QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null);
+ rule.initializeFromXml(element);
+ return rule;
+ }
+
+ /**
* Build a simple queue placement policy from the allow-undeclared-pools and
* user-as-default-queue configuration options.
*/
public static QueuePlacementPolicy fromConfiguration(Configuration conf,
- Set<String> configuredQueues) {
+ Map<FSQueueType, Set<String>> configuredQueues) {
boolean create = conf.getBoolean(
FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS);
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java Thu May 8 07:23:19 2014
@@ -18,16 +18,19 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import java.io.IOException;
-import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import org.apache.hadoop.security.Groups;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.w3c.dom.Element;
import org.w3c.dom.NamedNodeMap;
import org.w3c.dom.Node;
+import org.w3c.dom.NodeList;
+
+import com.google.common.annotations.VisibleForTesting;
public abstract class QueuePlacementRule {
protected boolean create;
@@ -58,16 +61,20 @@ public abstract class QueuePlacementRule
* continue to the next rule, and null indicates that the app should be rejected.
*/
public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) throws IOException {
- String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues);
- if (create || configuredQueues.contains(queue)) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ String queue = getQueueForApp(requestedQueue, user, groups,
+ configuredQueues);
+ if (create || configuredQueues.get(FSQueueType.LEAF).contains(queue)
+ || configuredQueues.get(FSQueueType.PARENT).contains(queue)) {
return queue;
} else {
return "";
}
}
- public void initializeFromXml(Element el) {
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
boolean create = true;
NamedNodeMap attributes = el.getAttributes();
Map<String, String> args = new HashMap<String, String>();
@@ -104,15 +111,16 @@ public abstract class QueuePlacementRule
* continue to the next rule.
*/
protected abstract String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) throws IOException;
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException;
/**
* Places apps in queues by username of the submitter
*/
public static class User extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups, Collection<String> configuredQueues) {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + user;
}
@@ -127,9 +135,9 @@ public abstract class QueuePlacementRule
*/
public static class PrimaryGroup extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups,
- Collection<String> configuredQueues) throws IOException {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
return "root." + groups.getGroups(user).get(0);
}
@@ -147,12 +155,15 @@ public abstract class QueuePlacementRule
*/
public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups,
- Collection<String> configuredQueues) throws IOException {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
List<String> groupNames = groups.getGroups(user);
for (int i = 1; i < groupNames.size(); i++) {
- if (configuredQueues.contains("root." + groupNames.get(i))) {
+ String group = groupNames.get(i);
+ if (configuredQueues.get(FSQueueType.LEAF).contains("root." + group)
+ || configuredQueues.get(FSQueueType.PARENT).contains(
+ "root." + group)) {
return "root." + groupNames.get(i);
}
}
@@ -167,12 +178,83 @@ public abstract class QueuePlacementRule
}
/**
+ * Places apps in queues with name of the submitter under the queue
+ * returned by the nested rule.
+ */
+ public static class NestedUserQueue extends QueuePlacementRule {
+ @VisibleForTesting
+ QueuePlacementRule nestedRule;
+
+ /**
+ * Parse xml and instantiate the nested rule
+ */
+ @Override
+ public void initializeFromXml(Element el)
+ throws AllocationConfigurationException {
+ NodeList elements = el.getChildNodes();
+
+ for (int i = 0; i < elements.getLength(); i++) {
+ Node node = elements.item(i);
+ if (node instanceof Element) {
+ Element element = (Element) node;
+ if ("rule".equals(element.getTagName())) {
+ QueuePlacementRule rule = QueuePlacementPolicy
+ .createAndInitializeRule(node);
+ if (rule == null) {
+ throw new AllocationConfigurationException(
+ "Unable to create nested rule in nestedUserQueue rule");
+ }
+ this.nestedRule = rule;
+ break;
+ } else {
+ continue;
+ }
+ }
+ }
+
+ if (this.nestedRule == null) {
+ throw new AllocationConfigurationException(
+ "No nested rule specified in <nestedUserQueue> rule");
+ }
+ super.initializeFromXml(el);
+ }
+
+ @Override
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues)
+ throws IOException {
+ // Apply the nested rule
+ String queueName = nestedRule.assignAppToQueue(requestedQueue, user,
+ groups, configuredQueues);
+
+ if (queueName != null && queueName != "") {
+ if (!queueName.startsWith("root.")) {
+ queueName = "root." + queueName;
+ }
+
+ // Verify if the queue returned by the nested rule is an configured leaf queue,
+ // if yes then skip to next rule in the queue placement policy
+ if (configuredQueues.get(FSQueueType.LEAF).contains(queueName)) {
+ return "";
+ }
+ return queueName + "." + user;
+ }
+ return queueName;
+ }
+
+ @Override
+ public boolean isTerminal() {
+ return false;
+ }
+ }
+
+ /**
* Places apps in queues by requested queue of the submitter
*/
public static class Specified extends QueuePlacementRule {
@Override
- protected String getQueueForApp(String requestedQueue,
- String user, Groups groups, Collection<String> configuredQueues) {
+ protected String getQueueForApp(String requestedQueue, String user,
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
return "";
} else {
@@ -195,7 +277,7 @@ public abstract class QueuePlacementRule
public static class Default extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
}
@@ -211,13 +293,13 @@ public abstract class QueuePlacementRule
public static class Reject extends QueuePlacementRule {
@Override
public String assignAppToQueue(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
return null;
}
@Override
protected String getQueueForApp(String requestedQueue, String user,
- Groups groups, Collection<String> configuredQueues) {
+ Groups groups, Map<FSQueueType, Set<String>> configuredQueues) {
throw new UnsupportedOperationException();
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java Thu May 8 07:23:19 2014
@@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
import static org.junit.Assert.*;
-import static org.junit.Assert.assertEquals;
import java.io.File;
import java.io.FileWriter;
@@ -28,6 +27,7 @@ import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.util.Clock;
@@ -99,9 +99,12 @@ public class TestAllocationFileLoaderSer
assertEquals(1, rules.size());
assertEquals(QueuePlacementRule.Default.class, rules.get(0).getClass());
assertEquals(1, allocConf.getQueueMaxApps("root.queueA"));
- assertEquals(2, allocConf.getQueueNames().size());
- assertTrue(allocConf.getQueueNames().contains("root.queueA"));
- assertTrue(allocConf.getQueueNames().contains("root.queueB"));
+ assertEquals(2, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .size());
+ assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .contains("root.queueA"));
+ assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .contains("root.queueB"));
confHolder.allocConf = null;
@@ -114,6 +117,9 @@ public class TestAllocationFileLoaderSer
out.println(" </queue>");
out.println(" <queuePlacementPolicy>");
out.println(" <rule name='specified' />");
+ out.println(" <rule name='nestedUserQueue' >");
+ out.println(" <rule name='primaryGroup' />");
+ out.println(" </rule>");
out.println(" <rule name='default' />");
out.println(" </queuePlacementPolicy>");
out.println("</allocations>");
@@ -131,12 +137,18 @@ public class TestAllocationFileLoaderSer
allocConf = confHolder.allocConf;
policy = allocConf.getPlacementPolicy();
rules = policy.getRules();
- assertEquals(2, rules.size());
+ assertEquals(3, rules.size());
assertEquals(QueuePlacementRule.Specified.class, rules.get(0).getClass());
- assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass());
+ assertEquals(QueuePlacementRule.NestedUserQueue.class, rules.get(1)
+ .getClass());
+ assertEquals(QueuePlacementRule.PrimaryGroup.class,
+ ((NestedUserQueue) (rules.get(1))).nestedRule.getClass());
+ assertEquals(QueuePlacementRule.Default.class, rules.get(2).getClass());
assertEquals(3, allocConf.getQueueMaxApps("root.queueB"));
- assertEquals(1, allocConf.getQueueNames().size());
- assertTrue(allocConf.getQueueNames().contains("root.queueB"));
+ assertEquals(1, allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .size());
+ assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .contains("root.queueB"));
}
@Test
@@ -170,6 +182,14 @@ public class TestAllocationFileLoaderSer
out.println("<queue name=\"queueE\">");
out.println("<minSharePreemptionTimeout>60</minSharePreemptionTimeout>");
out.println("</queue>");
+ //Make queue F a parent queue without configured leaf queues using the 'type' attribute
+ out.println("<queue name=\"queueF\" type=\"parent\" >");
+ out.println("</queue>");
+ //Create hierarchical queues G,H
+ out.println("<queue name=\"queueG\">");
+ out.println(" <queue name=\"queueH\">");
+ out.println(" </queue>");
+ out.println("</queue>");
// Set default limit of apps per queue to 15
out.println("<queueMaxAppsDefault>15</queueMaxAppsDefault>");
// Set default limit of apps per user to 5
@@ -194,7 +214,7 @@ public class TestAllocationFileLoaderSer
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
- assertEquals(5, queueConf.getQueueNames().size());
+ assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
@@ -250,6 +270,14 @@ public class TestAllocationFileLoaderSer
assertEquals(60000, queueConf.getMinSharePreemptionTimeout("root.queueE"));
assertEquals(300000, queueConf.getFairSharePreemptionTimeout());
+ assertTrue(queueConf.getConfiguredQueues()
+ .get(FSQueueType.PARENT)
+ .contains("root.queueF"));
+ assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.PARENT)
+ .contains("root.queueG"));
+ assertTrue(queueConf.getConfiguredQueues().get(FSQueueType.LEAF)
+ .contains("root.queueG.queueH"));
+
// Verify existing queues have default scheduling policy
assertEquals(DominantResourceFairnessPolicy.NAME,
queueConf.getSchedulingPolicy("root").getName());
@@ -315,7 +343,7 @@ public class TestAllocationFileLoaderSer
allocLoader.reloadAllocations();
AllocationConfiguration queueConf = confHolder.allocConf;
- assertEquals(5, queueConf.getQueueNames().size());
+ assertEquals(5, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size());
assertEquals(Resources.createResource(0),
queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME));
assertEquals(Resources.createResource(0),
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java Thu May 8 07:23:19 2014
@@ -34,6 +34,7 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
@@ -43,7 +44,6 @@ import java.util.Set;
import javax.xml.parsers.ParserConfigurationException;
import org.junit.Assert;
-
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.res
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
+import org.apache.hadoop.yarn.server.resourcemanager.resource.ResourceType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -737,8 +738,11 @@ public class TestFairScheduler {
rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
+ Map<FSQueueType, Set<String>> configuredQueues = new HashMap<FSQueueType, Set<String>>();
+ configuredQueues.put(FSQueueType.LEAF, queues);
+ configuredQueues.put(FSQueueType.PARENT, new HashSet<String>());
scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, queues, conf);
+ new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.somequeue", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user1");
@@ -758,7 +762,7 @@ public class TestFairScheduler {
rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null));
scheduler.getAllocationConfiguration().placementPolicy =
- new QueuePlacementPolicy(rules, queues, conf);
+ new QueuePlacementPolicy(rules, configuredQueues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1");
assertEquals("root.user1", scheduler.getSchedulerApp(appId).getQueueName());
appId = createSchedulingRequest(1024, "somequeue", "otheruser");
@@ -809,7 +813,89 @@ public class TestFairScheduler {
}
}
}
+
+ @Test
+ public void testNestedUserQueue() throws IOException {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+ conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
+ SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"user1group\" type=\"parent\">");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queuePlacementPolicy>");
+ out.println("<rule name=\"specified\" create=\"false\" />");
+ out.println("<rule name=\"nestedUserQueue\">");
+ out.println(" <rule name=\"primaryGroup\" create=\"false\" />");
+ out.println("</rule>");
+ out.println("<rule name=\"default\" />");
+ out.println("</queuePlacementPolicy>");
+ out.println("</allocations>");
+ out.close();
+
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+ RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+
+ FSLeafQueue user1Leaf = scheduler.assignToQueue(rmApp1, "root.default",
+ "user1");
+
+ assertEquals("root.user1group.user1", user1Leaf.getName());
+ }
+
+ @Test
+ public void testFairShareAndWeightsInNestedUserQueueRule() throws Exception {
+ conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
+
+ PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
+ out.println("<?xml version=\"1.0\"?>");
+ out.println("<allocations>");
+ out.println("<queue name=\"parentq\" type=\"parent\">");
+ out.println("<minResources>1024mb,0vcores</minResources>");
+ out.println("</queue>");
+ out.println("<queuePlacementPolicy>");
+ out.println("<rule name=\"nestedUserQueue\">");
+ out.println(" <rule name=\"specified\" create=\"false\" />");
+ out.println("</rule>");
+ out.println("<rule name=\"default\" />");
+ out.println("</queuePlacementPolicy>");
+ out.println("</allocations>");
+ out.close();
+
+ RMApp rmApp1 = new MockRMApp(0, 0, RMAppState.NEW);
+ RMApp rmApp2 = new MockRMApp(1, 1, RMAppState.NEW);
+
+ scheduler.reinitialize(conf, resourceManager.getRMContext());
+
+ int capacity = 16 * 1024;
+ // create node with 16 G
+ RMNode node1 = MockNodes.newNodeInfo(1, Resources.createResource(capacity),
+ 1, "127.0.0.1");
+ NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
+ scheduler.handle(nodeEvent1);
+
+ // user1,user2 submit their apps to parentq and create user queues
+ scheduler.assignToQueue(rmApp1, "root.parentq", "user1");
+ scheduler.assignToQueue(rmApp2, "root.parentq", "user2");
+
+ scheduler.update();
+
+ Collection<FSLeafQueue> leafQueues = scheduler.getQueueManager()
+ .getLeafQueues();
+
+ for (FSLeafQueue leaf : leafQueues) {
+ if (leaf.getName().equals("root.parentq.user1")
+ || leaf.getName().equals("root.parentq.user2")) {
+ // assert that the fair share is 1/4th node1's capacity
+ assertEquals(capacity / 4, leaf.getFairShare().getMemory());
+ // assert weights are equal for both the user queues
+ assertEquals(1.0, leaf.getWeights().getWeight(ResourceType.MEMORY), 0);
+ }
+ }
+ }
+
/**
* Make allocation requests and ensure they are reflected in queue demand.
*/
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueueManager.java Thu May 8 07:23:19 2014
@@ -17,8 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
+import static org.junit.Assert.*;
import static org.mockito.Mockito.*;
import java.util.HashSet;
@@ -57,45 +56,77 @@ public class TestQueueManager {
@Test
public void testReloadTurnsLeafQueueIntoParent() throws Exception {
- updateConfiguredQueues(queueManager, "queue1");
+ updateConfiguredLeafQueues(queueManager, "queue1");
// When no apps are running in the leaf queue, should be fine turning it
// into a parent.
- updateConfiguredQueues(queueManager, "queue1.queue2");
+ updateConfiguredLeafQueues(queueManager, "queue1.queue2");
assertNull(queueManager.getLeafQueue("queue1", false));
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
// When leaf queues are empty, should be ok deleting them and
// turning parent into a leaf.
- updateConfiguredQueues(queueManager, "queue1");
+ updateConfiguredLeafQueues(queueManager, "queue1");
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNotNull(queueManager.getLeafQueue("queue1", false));
// When apps exist in leaf queue, we shouldn't be able to create
// children under it, but things should work otherwise.
notEmptyQueues.add(queueManager.getLeafQueue("queue1", false));
- updateConfiguredQueues(queueManager, "queue1.queue2");
+ updateConfiguredLeafQueues(queueManager, "queue1.queue2");
assertNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNotNull(queueManager.getLeafQueue("queue1", false));
// When apps exist in leaf queues under a parent queue, shouldn't be
// able to turn it into a leaf queue, but things should work otherwise.
notEmptyQueues.clear();
- updateConfiguredQueues(queueManager, "queue1.queue2");
+ updateConfiguredLeafQueues(queueManager, "queue1.queue2");
notEmptyQueues.add(queueManager.getQueue("root.queue1"));
- updateConfiguredQueues(queueManager, "queue1");
+ updateConfiguredLeafQueues(queueManager, "queue1");
assertNotNull(queueManager.getLeafQueue("queue1.queue2", false));
assertNull(queueManager.getLeafQueue("queue1", false));
// Should never to be able to create a queue under the default queue
- updateConfiguredQueues(queueManager, "default.queue3");
+ updateConfiguredLeafQueues(queueManager, "default.queue3");
assertNull(queueManager.getLeafQueue("default.queue3", false));
assertNotNull(queueManager.getLeafQueue("default", false));
}
- private void updateConfiguredQueues(QueueManager queueMgr, String... confQueues) {
+ @Test
+ public void testReloadTurnsLeafToParentWithNoLeaf() {
+ AllocationConfiguration allocConf = new AllocationConfiguration(conf);
+ // Create a leaf queue1
+ allocConf.configuredQueues.get(FSQueueType.LEAF).add("root.queue1");
+ queueManager.updateAllocationConfiguration(allocConf);
+ assertNotNull(queueManager.getLeafQueue("root.queue1", false));
+
+ // Lets say later on admin makes queue1 a parent queue by
+ // specifying "type=parent" in the alloc xml and lets say apps running in
+ // queue1
+ notEmptyQueues.add(queueManager.getLeafQueue("root.queue1", false));
+ allocConf = new AllocationConfiguration(conf);
+ allocConf.configuredQueues.get(FSQueueType.PARENT)
+ .add("root.queue1");
+
+ // When allocs are reloaded queue1 shouldn't be converter to parent
+ queueManager.updateAllocationConfiguration(allocConf);
+ assertNotNull(queueManager.getLeafQueue("root.queue1", false));
+ assertNull(queueManager.getParentQueue("root.queue1", false));
+
+ // Now lets assume apps completed and there are no apps in queue1
+ notEmptyQueues.clear();
+ // We should see queue1 transform from leaf queue to parent queue.
+ queueManager.updateAllocationConfiguration(allocConf);
+ assertNull(queueManager.getLeafQueue("root.queue1", false));
+ assertNotNull(queueManager.getParentQueue("root.queue1", false));
+ // this parent should not have any children
+ assertTrue(queueManager.getParentQueue("root.queue1", false)
+ .getChildQueues().isEmpty());
+ }
+
+ private void updateConfiguredLeafQueues(QueueManager queueMgr, String... confLeafQueues) {
AllocationConfiguration allocConf = new AllocationConfiguration(conf);
- allocConf.queueNames = Sets.newHashSet(confQueues);
+ allocConf.configuredQueues.get(FSQueueType.LEAF).addAll(Sets.newHashSet(confLeafQueues));
queueMgr.updateAllocationConfiguration(allocConf);
}
}
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java Thu May 8 07:23:19 2014
@@ -17,8 +17,11 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair;
-import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.*;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
import java.util.Set;
import javax.xml.parsers.DocumentBuilder;
@@ -28,16 +31,15 @@ import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.security.GroupMappingServiceProvider;
+import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Test;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
-import com.google.common.collect.Sets;
-
public class TestQueuePlacementPolicy {
private final static Configuration conf = new Configuration();
- private final static Set<String> configuredQueues = Sets.newHashSet("root.someuser");
+ private Map<FSQueueType, Set<String>> configuredQueues;
@BeforeClass
public static void setup() {
@@ -45,6 +47,14 @@ public class TestQueuePlacementPolicy {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
}
+ @Before
+ public void initTest() {
+ configuredQueues = new HashMap<FSQueueType, Set<String>>();
+ for (FSQueueType type : FSQueueType.values()) {
+ configuredQueues.put(type, new HashSet<String>());
+ }
+ }
+
@Test
public void testSpecifiedUserPolicy() throws Exception {
StringBuffer sb = new StringBuffer();
@@ -53,9 +63,12 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='user' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser"));
- assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
- assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser"));
+ assertEquals("root.specifiedq",
+ policy.assignAppToQueue("specifiedq", "someuser"));
+ assertEquals("root.someuser",
+ policy.assignAppToQueue("default", "someuser"));
+ assertEquals("root.otheruser",
+ policy.assignAppToQueue("default", "otheruser"));
}
@Test
@@ -66,6 +79,8 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='user' create=\"false\" />");
sb.append(" <rule name='default' />");
sb.append("</queuePlacementPolicy>");
+
+ configuredQueues.get(FSQueueType.LEAF).add("root.someuser");
QueuePlacementPolicy policy = parse(sb.toString());
assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser"));
@@ -81,7 +96,8 @@ public class TestQueuePlacementPolicy {
sb.append(" <rule name='reject' />");
sb.append("</queuePlacementPolicy>");
QueuePlacementPolicy policy = parse(sb.toString());
- assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser"));
+ assertEquals("root.specifiedq",
+ policy.assignAppToQueue("specifiedq", "someuser"));
assertEquals(null, policy.assignAppToQueue("default", "someuser"));
}
@@ -117,10 +133,188 @@ public class TestQueuePlacementPolicy {
parse(sb.toString());
}
+ @Test
+ public void testNestedUserQueueParsingErrors() {
+ // No nested rule specified in hierarchical user queue
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' />");
+ sb.append(" <rule name='nestedUserQueue'/>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+
+ // Specified nested rule is not a QueuePlacementRule
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' />");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='unknownRule'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ assertIfExceptionThrown(sb);
+ }
+
+ private void assertIfExceptionThrown(StringBuffer sb) {
+ Throwable th = null;
+ try {
+ parse(sb.toString());
+ } catch (Exception e) {
+ th = e;
+ }
+
+ assertTrue(th instanceof AllocationConfigurationException);
+ }
+
+ @Test
+ public void testNestedUserQueueParsing() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' />");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='primaryGroup'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ Throwable th = null;
+ try {
+ parse(sb.toString());
+ } catch (Exception e) {
+ th = e;
+ }
+
+ assertNull(th);
+ }
+
+ @Test
+ public void testNestedUserQueuePrimaryGroup() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='specified' create='false' />");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='primaryGroup'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ // User queue would be created under primary group queue
+ QueuePlacementPolicy policy = parse(sb.toString());
+ assertEquals("root.user1group.user1",
+ policy.assignAppToQueue("root.default", "user1"));
+ // Other rules above and below hierarchical user queue rule should work as
+ // usual
+ configuredQueues.get(FSQueueType.LEAF).add("root.specifiedq");
+ // test if specified rule(above nestedUserQueue rule) works ok
+ assertEquals("root.specifiedq",
+ policy.assignAppToQueue("root.specifiedq", "user2"));
+
+ // test if default rule(below nestedUserQueue rule) works
+ configuredQueues.get(FSQueueType.LEAF).add("root.user3group");
+ assertEquals("root.default",
+ policy.assignAppToQueue("root.default", "user3"));
+ }
+
+ @Test
+ public void testNestedUserQueuePrimaryGroupNoCreate() throws Exception {
+ // Primary group rule has create='false'
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='primaryGroup' create='false'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ QueuePlacementPolicy policy = parse(sb.toString());
+
+ // Should return root.default since primary group 'root.user1group' is not
+ // configured
+ assertEquals("root.default",
+ policy.assignAppToQueue("root.default", "user1"));
+
+ // Let's configure primary group and check if user queue is created
+ configuredQueues.get(FSQueueType.PARENT).add("root.user1group");
+ policy = parse(sb.toString());
+ assertEquals("root.user1group.user1",
+ policy.assignAppToQueue("root.default", "user1"));
+
+ // Both Primary group and nestedUserQueue rule has create='false'
+ sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue' create='false'>");
+ sb.append(" <rule name='primaryGroup' create='false'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ // Should return root.default since primary group and user queue for user 2
+ // are not configured.
+ assertEquals("root.default",
+ policy.assignAppToQueue("root.default", "user2"));
+
+ // Now configure both primary group and the user queue for user2
+ configuredQueues.get(FSQueueType.PARENT).add("root.user2group");
+ configuredQueues.get(FSQueueType.LEAF).add("root.user2group.user2");
+ policy = parse(sb.toString());
+
+ assertEquals("root.user2group.user2",
+ policy.assignAppToQueue("root.default", "user2"));
+ }
+
+ @Test
+ public void testNestedUserQueueSecondaryGroup() throws Exception {
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='secondaryGroupExistingQueue'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ QueuePlacementPolicy policy = parse(sb.toString());
+ // Should return root.default since secondary groups are not configured
+ assertEquals("root.default",
+ policy.assignAppToQueue("root.default", "user1"));
+
+ // configure secondary group for user1
+ configuredQueues.get(FSQueueType.PARENT).add("root.user1subgroup1");
+ policy = parse(sb.toString());
+ // user queue created should be created under secondary group
+ assertEquals("root.user1subgroup1.user1",
+ policy.assignAppToQueue("root.default", "user1"));
+ }
+
+ @Test
+ public void testNestedUserQueueSpecificRule() throws Exception {
+ // This test covers the use case where users can specify different parent
+ // queues and want user queues under those.
+ StringBuffer sb = new StringBuffer();
+ sb.append("<queuePlacementPolicy>");
+ sb.append(" <rule name='nestedUserQueue'>");
+ sb.append(" <rule name='specified' create='false'/>");
+ sb.append(" </rule>");
+ sb.append(" <rule name='default' />");
+ sb.append("</queuePlacementPolicy>");
+
+ // Let's create couple of parent queues
+ configuredQueues.get(FSQueueType.PARENT).add("root.parent1");
+ configuredQueues.get(FSQueueType.PARENT).add("root.parent2");
+
+ QueuePlacementPolicy policy = parse(sb.toString());
+ assertEquals("root.parent1.user1",
+ policy.assignAppToQueue("root.parent1", "user1"));
+ assertEquals("root.parent2.user2",
+ policy.assignAppToQueue("root.parent2", "user2"));
+ }
+
private QueuePlacementPolicy parse(String str) throws Exception {
// Read and parse the allocations file.
- DocumentBuilderFactory docBuilderFactory =
- DocumentBuilderFactory.newInstance();
+ DocumentBuilderFactory docBuilderFactory = DocumentBuilderFactory
+ .newInstance();
docBuilderFactory.setIgnoringComments(true);
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(IOUtils.toInputStream(str));
Modified: hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm
URL: http://svn.apache.org/viewvc/hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm?rev=1593192&r1=1593191&r2=1593192&view=diff
==============================================================================
--- hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm (original)
+++ hadoop/common/branches/branch-2/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/apt/FairScheduler.apt.vm Thu May 8 07:23:19 2014
@@ -206,8 +206,10 @@ Allocation file format
The allocation file must be in XML format. The format contains five types of
elements:
- * <<Queue elements>>, which represent queues. Each may contain the following
- properties:
+ * <<Queue elements>>, which represent queues. Queue elements can take an optional
+ attribute âtypeâ,which when set to âparentâ makes it a parent queue. This is useful
+ when we want to create a parent queue without configuring any leaf queues.
+ Each queue element may contain the following properties:
* minResources: minimum resources the queue is entitled to, in the form
"X mb, Y vcores". For the single-resource fairness policy, the vcores
@@ -299,6 +301,15 @@ Allocation file format
that matches a secondary group of the user who submitted it. The first
secondary group that matches a configured queue will be selected.
+ * nestedUserQueue : the app is placed into a queue with the name of the user
+ under the queue suggested by the nested rule. This is similar to âuserâ
+ rule,the difference being in ânestedUserQueueâ rule,user queues can be created
+ under any parent queue, while âuserâ rule creates user queues only under root queue.
+ Note that nestedUserQueue rule would be applied only if the nested rule returns a
+ parent queue.One can configure a parent queue either by setting âtypeâ attribute of queue
+ to âparentâ or by configuring at least one leaf under that queue which makes it a parent.
+ See example allocation for a sample use case.
+
* default: the app is placed into the queue named "default".
* reject: the app is rejected.
@@ -319,6 +330,12 @@ Allocation file format
<minResources>5000 mb,0vcores</minResources>
</queue>
</queue>
+
+ <!â- Queue âsecondary_group_queueâ is a parent queue and may have
+ user queues under it -â>
+ <queue name=âsecondary_group_queueâ type=âparentâ>
+ <weight>3.0</weight>
+ </queue>
<user name="sample_user">
<maxRunningApps>30</maxRunningApps>
@@ -328,6 +345,9 @@ Allocation file format
<queuePlacementPolicy>
<rule name="specified" />
<rule name="primaryGroup" create="false" />
+ <rule name=ânestedUserQueueâ>
+ <rule name=âsecondaryGroupExistingQueueâ create=âfalseâ />
+ </rule>
<rule name="default" />
</queuePlacementPolicy>
</allocations>