You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by as...@apache.org on 2016/12/02 03:18:29 UTC
[17/24] hadoop git commit: YARN-5761. Separate QueueManager from
Scheduler. (Xuan Gong via gtcarrera9)
YARN-5761. Separate QueueManager from Scheduler. (Xuan Gong via gtcarrera9)
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/69fb70c3
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/69fb70c3
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/69fb70c3
Branch: refs/heads/YARN-5085
Commit: 69fb70c31aa277f7fb14b05c0185ddc5cd90793d
Parents: 3fd844b
Author: Li Lu <gt...@apache.org>
Authored: Wed Nov 30 13:38:42 2016 -0800
Committer: Li Lu <gt...@apache.org>
Committed: Wed Nov 30 13:38:42 2016 -0800
----------------------------------------------------------------------
.../scheduler/SchedulerQueueManager.java | 75 ++++
.../scheduler/capacity/CapacityScheduler.java | 294 +++------------
.../capacity/CapacitySchedulerQueueManager.java | 361 +++++++++++++++++++
.../capacity/TestApplicationLimits.java | 35 +-
.../TestApplicationLimitsByPartition.java | 7 +-
.../scheduler/capacity/TestChildQueueOrder.java | 9 +-
.../scheduler/capacity/TestLeafQueue.java | 9 +-
.../scheduler/capacity/TestParentQueue.java | 39 +-
.../scheduler/capacity/TestReservations.java | 8 +-
.../scheduler/capacity/TestUtils.java | 2 +-
10 files changed, 536 insertions(+), 303 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
new file mode 100644
index 0000000..92b989a
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerQueueManager.java
@@ -0,0 +1,75 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import java.io.IOException;
+import java.util.Map;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
+
+/**
+ *
+ * Context of the Queues in Scheduler.
+ *
+ */
+@Private
+@Unstable
+public interface SchedulerQueueManager<T extends Queue,
+ E extends ReservationSchedulerConfiguration> {
+
+ /**
+ * Get the root queue.
+ * @return root queue
+ */
+ T getRootQueue();
+
+ /**
+ * Get all the queues.
+ * @return a map contains all the queues as well as related queue names
+ */
+ Map<String, T> getQueues();
+
+ /**
+ * Remove the queue from the existing queue.
+ * @param queueName the queue name
+ */
+ void removeQueue(String queueName);
+
+ /**
+ * Add a new queue to the existing queues.
+ * @param queueName the queue name
+ * @param queue the queue object
+ */
+ void addQueue(String queueName, T queue);
+
+ /**
+ * Get a queue matching the specified queue name.
+ * @param queueName the queue name
+ * @return a queue object
+ */
+ T getQueue(String queueName);
+
+ /**
+ * Reinitialize the queues.
+ * @param newConf the configuration
+ * @throws IOException if fails to re-initialize queues
+ */
+ void reinitializeQueues(E newConf) throws IOException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index cc8b3b0..e42b20c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -25,7 +25,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.EnumSet;
-import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -68,8 +67,6 @@ import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
-import org.apache.hadoop.yarn.security.Permission;
-import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
@@ -156,9 +153,9 @@ public class CapacityScheduler extends
ResourceAllocationCommitter {
private static final Log LOG = LogFactory.getLog(CapacityScheduler.class);
- private YarnAuthorizationProvider authorizer;
- private CSQueue root;
+ private CapacitySchedulerQueueManager queueManager;
+
// timeout to join when we stop this service
protected final long THREAD_JOIN_TIMEOUT_MS = 1000;
@@ -168,22 +165,6 @@ public class CapacityScheduler extends
private int offswitchPerHeartbeatLimit;
- static final Comparator<CSQueue> nonPartitionedQueueComparator =
- new Comparator<CSQueue>() {
- @Override
- public int compare(CSQueue q1, CSQueue q2) {
- if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
- return -1;
- } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
- return 1;
- }
-
- return q1.getQueuePath().compareTo(q2.getQueuePath());
- }
- };
-
- static final PartitionedQueueComparator partitionedQueueComparator =
- new PartitionedQueueComparator();
@Override
public void setConf(Configuration conf) {
@@ -236,8 +217,6 @@ public class CapacityScheduler extends
private CapacitySchedulerConfiguration conf;
private Configuration yarnConf;
- private Map<String, CSQueue> queues = new ConcurrentHashMap<String, CSQueue>();
-
private ResourceCalculator calculator;
private boolean usePortForNodeName;
@@ -261,11 +240,11 @@ public class CapacityScheduler extends
@Override
public QueueMetrics getRootQueueMetrics() {
- return root.getMetrics();
+ return getRootQueue().getMetrics();
}
public CSQueue getRootQueue() {
- return root;
+ return queueManager.getRootQueue();
}
@Override
@@ -290,12 +269,12 @@ public class CapacityScheduler extends
@Override
public Comparator<CSQueue> getNonPartitionedQueueComparator() {
- return nonPartitionedQueueComparator;
+ return CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR;
}
@Override
public PartitionedQueueComparator getPartitionedQueueComparator() {
- return partitionedQueueComparator;
+ return CapacitySchedulerQueueManager.PARTITIONED_QUEUE_COMPARATOR;
}
@Override
@@ -326,7 +305,10 @@ public class CapacityScheduler extends
this.usePortForNodeName = this.conf.getUsePortForNodeName();
this.applications = new ConcurrentHashMap<>();
this.labelManager = rmContext.getNodeLabelManager();
- authorizer = YarnAuthorizationProvider.getInstance(yarnConf);
+ this.queueManager = new CapacitySchedulerQueueManager(yarnConf,
+ this.labelManager);
+ this.queueManager.setCapacitySchedulerContext(this);
+
this.activitiesManager = new ActivitiesManager(rmContext);
activitiesManager.init(conf);
initializeQueues(this.conf);
@@ -554,13 +536,6 @@ public class CapacityScheduler extends
}
}
- static class QueueHook {
- public CSQueue hook(CSQueue queue) {
- return queue;
- }
- }
- private static final QueueHook noop = new QueueHook();
-
@VisibleForTesting
public UserGroupMappingPlacementRule
getUserGroupMappingPlacementRule() throws IOException {
@@ -578,7 +553,7 @@ public class CapacityScheduler extends
if (!mappingQueue.equals(
UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue
.equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) {
- CSQueue queue = queues.get(mappingQueue);
+ CSQueue queue = getQueue(mappingQueue);
if (queue == null || !(queue instanceof LeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue " + mappingQueue);
@@ -616,184 +591,29 @@ public class CapacityScheduler extends
private void initializeQueues(CapacitySchedulerConfiguration conf)
throws IOException {
- root =
- parseQueue(this, conf, null, CapacitySchedulerConfiguration.ROOT,
- queues, queues, noop);
- labelManager.reinitializeQueueLabels(getQueueToLabels());
- LOG.info("Initialized root queue " + root);
+ this.queueManager.initializeQueues(conf);
+
updatePlacementRules();
- setQueueAcls(authorizer, queues);
// Notify Preemption Manager
- preemptionManager.refreshQueues(null, root);
+ preemptionManager.refreshQueues(null, this.getRootQueue());
}
@Lock(CapacityScheduler.class)
private void reinitializeQueues(CapacitySchedulerConfiguration newConf)
throws IOException {
- // Parse new queues
- Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
- CSQueue newRoot =
- parseQueue(this, newConf, null, CapacitySchedulerConfiguration.ROOT,
- newQueues, queues, noop);
-
- // Ensure all existing queues are still present
- validateExistingQueues(queues, newQueues);
-
- // Add new queues
- addNewQueues(queues, newQueues);
-
- // Re-configure queues
- root.reinitialize(newRoot, getClusterResource());
+ this.queueManager.reinitializeQueues(newConf);
updatePlacementRules();
- // Re-calculate headroom for active applications
- Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource, new ResourceLimits(
- clusterResource));
-
- labelManager.reinitializeQueueLabels(getQueueToLabels());
- setQueueAcls(authorizer, queues);
-
// Notify Preemption Manager
- preemptionManager.refreshQueues(null, root);
- }
-
- @VisibleForTesting
- public static void setQueueAcls(YarnAuthorizationProvider authorizer,
- Map<String, CSQueue> queues) throws IOException {
- List<Permission> permissions = new ArrayList<>();
- for (CSQueue queue : queues.values()) {
- AbstractCSQueue csQueue = (AbstractCSQueue) queue;
- permissions.add(
- new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
- }
- authorizer.setPermission(permissions, UserGroupInformation.getCurrentUser());
- }
-
- private Map<String, Set<String>> getQueueToLabels() {
- Map<String, Set<String>> queueToLabels = new HashMap<String, Set<String>>();
- for (CSQueue queue : queues.values()) {
- queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
- }
- return queueToLabels;
- }
-
- /**
- * Ensure all existing queues are present. Queues cannot be deleted
- * @param queues existing queues
- * @param newQueues new queues
- */
- @Lock(CapacityScheduler.class)
- private void validateExistingQueues(
- Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
- throws IOException {
- // check that all static queues are included in the newQueues list
- for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
- if (!(e.getValue() instanceof ReservationQueue)) {
- String queueName = e.getKey();
- CSQueue oldQueue = e.getValue();
- CSQueue newQueue = newQueues.get(queueName);
- if (null == newQueue) {
- throw new IOException(queueName + " cannot be found during refresh!");
- } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
- throw new IOException(queueName + " is moved from:"
- + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
- + " after refresh, which is not allowed.");
- }
- }
- }
- }
-
- /**
- * Add the new queues (only) to our list of queues...
- * ... be careful, do not overwrite existing queues.
- * @param queues
- * @param newQueues
- */
- @Lock(CapacityScheduler.class)
- private void addNewQueues(
- Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
- {
- for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
- String queueName = e.getKey();
- CSQueue queue = e.getValue();
- if (!queues.containsKey(queueName)) {
- queues.put(queueName, queue);
- }
- }
- }
-
- @Lock(CapacityScheduler.class)
- static CSQueue parseQueue(
- CapacitySchedulerContext csContext,
- CapacitySchedulerConfiguration conf,
- CSQueue parent, String queueName, Map<String, CSQueue> queues,
- Map<String, CSQueue> oldQueues,
- QueueHook hook) throws IOException {
- CSQueue queue;
- String fullQueueName =
- (parent == null) ? queueName
- : (parent.getQueuePath() + "." + queueName);
- String[] childQueueNames =
- conf.getQueues(fullQueueName);
- boolean isReservableQueue = conf.isReservable(fullQueueName);
- if (childQueueNames == null || childQueueNames.length == 0) {
- if (null == parent) {
- throw new IllegalStateException(
- "Queue configuration missing child queue names for " + queueName);
- }
- // Check if the queue will be dynamically managed by the Reservation
- // system
- if (isReservableQueue) {
- queue =
- new PlanQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
- } else {
- queue =
- new LeafQueue(csContext, queueName, parent,
- oldQueues.get(queueName));
-
- // Used only for unit tests
- queue = hook.hook(queue);
- }
- } else {
- if (isReservableQueue) {
- throw new IllegalStateException(
- "Only Leaf Queues can be reservable for " + queueName);
- }
- ParentQueue parentQueue =
- new ParentQueue(csContext, queueName, parent, oldQueues.get(queueName));
-
- // Used only for unit tests
- queue = hook.hook(parentQueue);
-
- List<CSQueue> childQueues = new ArrayList<CSQueue>();
- for (String childQueueName : childQueueNames) {
- CSQueue childQueue =
- parseQueue(csContext, conf, queue, childQueueName,
- queues, oldQueues, hook);
- childQueues.add(childQueue);
- }
- parentQueue.setChildQueues(childQueues);
- }
-
- if (queue instanceof LeafQueue && queues.containsKey(queueName)
- && queues.get(queueName) instanceof LeafQueue) {
- throw new IOException("Two leaf queues were named " + queueName
- + ". Leaf queue names must be distinct");
- }
- queues.put(queueName, queue);
-
- LOG.info("Initialized queue: " + queue);
- return queue;
+ preemptionManager.refreshQueues(null, this.getRootQueue());
}
public CSQueue getQueue(String queueName) {
if (queueName == null) {
return null;
}
- return queues.get(queueName);
+ return this.queueManager.getQueue(queueName);
}
private void addApplicationOnRecovery(
@@ -1047,7 +867,7 @@ public class CapacityScheduler extends
// Inform the queue
String queueName = attempt.getQueue().getQueueName();
- CSQueue queue = queues.get(queueName);
+ CSQueue queue = this.getQueue(queueName);
if (!(queue instanceof LeafQueue)) {
LOG.error(
"Cannot finish application " + "from non-leaf queue: " + queueName);
@@ -1174,7 +994,7 @@ public class CapacityScheduler extends
boolean includeChildQueues, boolean recursive)
throws IOException {
CSQueue queue = null;
- queue = this.queues.get(queueName);
+ queue = this.getQueue(queueName);
if (queue == null) {
throw new IOException("Unknown queue: " + queueName);
}
@@ -1192,7 +1012,7 @@ public class CapacityScheduler extends
return new ArrayList<QueueUserACLInfo>();
}
- return root.getQueueUserAclInfo(user);
+ return getRootQueue().getQueueUserAclInfo(user);
}
@Override
@@ -1235,7 +1055,7 @@ public class CapacityScheduler extends
writeLock.lock();
updateNodeResource(nm, resourceOption);
Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource,
+ getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
} finally {
writeLock.unlock();
@@ -1471,8 +1291,8 @@ public class CapacityScheduler extends
private CSAssignment allocateOrReserveNewContainers(
PlacementSet<FiCaSchedulerNode> ps, boolean withNodeHeartbeat) {
- CSAssignment assignment = root.assignContainers(getClusterResource(), ps,
- new ResourceLimits(labelManager
+ CSAssignment assignment = getRootQueue().assignContainers(
+ getClusterResource(), ps, new ResourceLimits(labelManager
.getResourceByLabel(ps.getPartition(), getClusterResource())),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
@@ -1506,7 +1326,7 @@ public class CapacityScheduler extends
}
// Try to use NON_EXCLUSIVE
- assignment = root.assignContainers(getClusterResource(), ps,
+ assignment = getRootQueue().assignContainers(getClusterResource(), ps,
// TODO, now we only consider limits for parent for non-labeled
// resources, should consider labeled resources as well.
new ResourceLimits(labelManager
@@ -1526,8 +1346,8 @@ public class CapacityScheduler extends
PlacementSet<FiCaSchedulerNode> ps) {
// When this time look at multiple nodes, try schedule if the
// partition has any available resource or killable resource
- if (root.getQueueCapacities().getUsedCapacity(ps.getPartition()) >= 1.0f
- && preemptionManager.getKillableResource(
+ if (getRootQueue().getQueueCapacities().getUsedCapacity(
+ ps.getPartition()) >= 1.0f && preemptionManager.getKillableResource(
CapacitySchedulerConfiguration.ROOT, ps.getPartition()) == Resources
.none()) {
if (LOG.isDebugEnabled()) {
@@ -1710,7 +1530,7 @@ public class CapacityScheduler extends
updateLabelsOnNode(id, labels);
}
Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource,
+ getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
} finally {
writeLock.unlock();
@@ -1731,7 +1551,7 @@ public class CapacityScheduler extends
}
Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource,
+ getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
LOG.info(
@@ -1782,7 +1602,7 @@ public class CapacityScheduler extends
nodeTracker.removeNode(nodeId);
Resource clusterResource = getClusterResource();
- root.updateClusterResource(clusterResource,
+ getRootQueue().updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
int numNodes = nodeTracker.nodeCount();
@@ -2020,7 +1840,7 @@ public class CapacityScheduler extends
@Override
public List<ApplicationAttemptId> getAppsInQueue(String queueName) {
- CSQueue queue = queues.get(queueName);
+ CSQueue queue = getQueue(queueName);
if (queue == null) {
return null;
}
@@ -2030,7 +1850,8 @@ public class CapacityScheduler extends
}
public boolean isSystemAppsLimitReached() {
- if (root.getNumApplications() < conf.getMaximumSystemApplications()) {
+ if (getRootQueue().getNumApplications() < conf
+ .getMaximumSystemApplications()) {
return false;
}
return true;
@@ -2131,7 +1952,7 @@ public class CapacityScheduler extends
}
((PlanQueue) disposableLeafQueue.getParent()).removeChildQueue(q);
- this.queues.remove(queueName);
+ this.queueManager.removeQueue(queueName);
LOG.info("Removal of ReservationQueue " + queueName + " has succeeded");
} finally {
writeLock.unlock();
@@ -2160,7 +1981,7 @@ public class CapacityScheduler extends
PlanQueue parentPlan = (PlanQueue) newQueue.getParent();
String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue);
- this.queues.put(queuename, newQueue);
+ this.queueManager.addQueue(queuename, newQueue);
LOG.info("Creation of ReservationQueue " + newQueue + " succeeded");
} finally {
writeLock.unlock();
@@ -2172,7 +1993,7 @@ public class CapacityScheduler extends
throws YarnException {
try {
writeLock.lock();
- LeafQueue queue = getAndCheckLeafQueue(inQueue);
+ LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
ParentQueue parent = (ParentQueue) queue.getParent();
if (!(queue instanceof ReservationQueue)) {
@@ -2224,9 +2045,10 @@ public class CapacityScheduler extends
FiCaSchedulerApp app = getApplicationAttempt(
ApplicationAttemptId.newInstance(appId, 0));
String sourceQueueName = app.getQueue().getQueueName();
- LeafQueue source = getAndCheckLeafQueue(sourceQueueName);
+ LeafQueue source = this.queueManager.getAndCheckLeafQueue(
+ sourceQueueName);
String destQueueName = handleMoveToPlanQueue(targetQueueName);
- LeafQueue dest = getAndCheckLeafQueue(destQueueName);
+ LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = app.getUser();
checkQueuePartition(app, dest);
@@ -2290,27 +2112,6 @@ public class CapacityScheduler extends
}
}
- /**
- * Check that the String provided in input is the name of an existing,
- * LeafQueue, if successful returns the queue.
- *
- * @param queue
- * @return the LeafQueue
- * @throws YarnException
- */
- private LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
- CSQueue ret = this.getQueue(queue);
- if (ret == null) {
- throw new YarnException("The specified Queue: " + queue
- + " doesn't exist");
- }
- if (!(ret instanceof LeafQueue)) {
- throw new YarnException("The specified Queue: " + queue
- + " is not a Leaf Queue. Move is supported only for Leaf Queues.");
- }
- return (LeafQueue) ret;
- }
-
/** {@inheritDoc} */
@Override
public EnumSet<SchedulerResourceTypes> getSchedulingResourceTypes() {
@@ -2347,7 +2148,7 @@ public class CapacityScheduler extends
@Override
public Set<String> getPlanQueues() {
Set<String> ret = new HashSet<String>();
- for (Map.Entry<String, CSQueue> l : queues.entrySet()) {
+ for (Map.Entry<String, CSQueue> l : queueManager.getQueues().entrySet()) {
if (l.getValue() instanceof PlanQueue) {
ret.add(l.getKey());
}
@@ -2367,7 +2168,8 @@ public class CapacityScheduler extends
if (null == priorityFromContext) {
// Get the default priority for the Queue. If Queue is non-existent, then
// use default priority
- priorityFromContext = getDefaultPriorityForQueue(queueName);
+ priorityFromContext = this.queueManager.getDefaultPriorityForQueue(
+ queueName);
LOG.info("Application '" + applicationId
+ "' is submitted without priority "
@@ -2391,18 +2193,6 @@ public class CapacityScheduler extends
return appPriority;
}
- private Priority getDefaultPriorityForQueue(String queueName) {
- Queue queue = getQueue(queueName);
- if (null == queue || null == queue.getDefaultApplicationPriority()) {
- // Return with default application priority
- return Priority.newInstance(CapacitySchedulerConfiguration
- .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
- }
-
- return Priority.newInstance(queue.getDefaultApplicationPriority()
- .getPriority());
- }
-
@Override
public Priority updateApplicationPriority(Priority newPriority,
ApplicationId applicationId, SettableFuture<Object> future)
@@ -2456,7 +2246,7 @@ public class CapacityScheduler extends
@Override
public ResourceUsage getClusterResourceUsage() {
- return root.getQueueResourceUsage();
+ return getRootQueue().getQueueResourceUsage();
}
private SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> getSchedulerContainer(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
new file mode 100644
index 0000000..7a6ce56
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -0,0 +1,361 @@
+/**
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements. See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership. The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License. You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import com.google.common.annotations.VisibleForTesting;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.security.Permission;
+import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
+import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerQueueManager;
+
+/**
+ *
+ * Context of the Queues in Capacity Scheduler.
+ *
+ */
+@Private
+@Unstable
+public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
+ CSQueue, CapacitySchedulerConfiguration>{
+
+ private static final Log LOG = LogFactory.getLog(
+ CapacitySchedulerQueueManager.class);
+
+ static final Comparator<CSQueue> NON_PARTITIONED_QUEUE_COMPARATOR =
+ new Comparator<CSQueue>() {
+ @Override
+ public int compare(CSQueue q1, CSQueue q2) {
+ if (q1.getUsedCapacity() < q2.getUsedCapacity()) {
+ return -1;
+ } else if (q1.getUsedCapacity() > q2.getUsedCapacity()) {
+ return 1;
+ }
+
+ return q1.getQueuePath().compareTo(q2.getQueuePath());
+ }
+ };
+
+ static final PartitionedQueueComparator PARTITIONED_QUEUE_COMPARATOR =
+ new PartitionedQueueComparator();
+
+ static class QueueHook {
+ public CSQueue hook(CSQueue queue) {
+ return queue;
+ }
+ }
+
+ private static final QueueHook NOOP = new QueueHook();
+ private CapacitySchedulerContext csContext;
+ private final YarnAuthorizationProvider authorizer;
+ private final Map<String, CSQueue> queues = new ConcurrentHashMap<>();
+ private CSQueue root;
+ private final RMNodeLabelsManager labelManager;
+
+ /**
+ * Construct the service.
+ * @param conf the configuration
+ * @param labelManager the labelManager
+ */
+ public CapacitySchedulerQueueManager(Configuration conf,
+ RMNodeLabelsManager labelManager) {
+ this.authorizer = YarnAuthorizationProvider.getInstance(conf);
+ this.labelManager = labelManager;
+ }
+
+ @Override
+ public CSQueue getRootQueue() {
+ return this.root;
+ }
+
+ @Override
+ public Map<String, CSQueue> getQueues() {
+ return queues;
+ }
+
+ @Override
+ public void removeQueue(String queueName) {
+ this.queues.remove(queueName);
+ }
+
+ @Override
+ public void addQueue(String queueName, CSQueue queue) {
+ this.queues.put(queueName, queue);
+ }
+
+ @Override
+ public CSQueue getQueue(String queueName) {
+ return queues.get(queueName);
+ }
+
+ /**
+ * Set the CapacitySchedulerContext.
+ * @param capacitySchedulerContext the CapacitySchedulerContext
+ */
+ public void setCapacitySchedulerContext(
+ CapacitySchedulerContext capacitySchedulerContext) {
+ this.csContext = capacitySchedulerContext;
+ }
+
+ /**
+ * Initialized the queues.
+ * @param conf the CapacitySchedulerConfiguration
+ * @throws IOException if fails to initialize queues
+ */
+ public void initializeQueues(CapacitySchedulerConfiguration conf)
+ throws IOException {
+ root = parseQueue(this.csContext, conf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP);
+ setQueueAcls(authorizer, queues);
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
+ LOG.info("Initialized root queue " + root);
+ }
+
+ @Override
+ public void reinitializeQueues(CapacitySchedulerConfiguration newConf)
+ throws IOException {
+ // Parse new queues
+ Map<String, CSQueue> newQueues = new HashMap<>();
+ CSQueue newRoot = parseQueue(this.csContext, newConf, null,
+ CapacitySchedulerConfiguration.ROOT, newQueues, queues, NOOP);
+
+ // Ensure all existing queues are still present
+ validateExistingQueues(queues, newQueues);
+
+ // Add new queues
+ addNewQueues(queues, newQueues);
+
+ // Re-configure queues
+ root.reinitialize(newRoot, this.csContext.getClusterResource());
+
+ setQueueAcls(authorizer, queues);
+
+ // Re-calculate headroom for active applications
+ Resource clusterResource = this.csContext.getClusterResource();
+ root.updateClusterResource(clusterResource, new ResourceLimits(
+ clusterResource));
+
+ labelManager.reinitializeQueueLabels(getQueueToLabels());
+ }
+
+ /**
+ * Parse the queue from the configuration.
+ * @param csContext the CapacitySchedulerContext
+ * @param conf the CapacitySchedulerConfiguration
+ * @param parent the parent queue
+ * @param queueName the queue name
+ * @param queues all the queues
+ * @param oldQueues the old queues
+ * @param hook the queue hook
+ * @return the CSQueue
+ * @throws IOException
+ */
+ static CSQueue parseQueue(
+ CapacitySchedulerContext csContext,
+ CapacitySchedulerConfiguration conf,
+ CSQueue parent, String queueName, Map<String, CSQueue> queues,
+ Map<String, CSQueue> oldQueues,
+ QueueHook hook) throws IOException {
+ CSQueue queue;
+ String fullQueueName =
+ (parent == null) ? queueName
+ : (parent.getQueuePath() + "." + queueName);
+ String[] childQueueNames = conf.getQueues(fullQueueName);
+ boolean isReservableQueue = conf.isReservable(fullQueueName);
+ if (childQueueNames == null || childQueueNames.length == 0) {
+ if (null == parent) {
+ throw new IllegalStateException(
+ "Queue configuration missing child queue names for " + queueName);
+ }
+ // Check if the queue will be dynamically managed by the Reservation
+ // system
+ if (isReservableQueue) {
+ queue =
+ new PlanQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+ } else {
+ queue =
+ new LeafQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+
+ // Used only for unit tests
+ queue = hook.hook(queue);
+ }
+ } else {
+ if (isReservableQueue) {
+ throw new IllegalStateException(
+ "Only Leaf Queues can be reservable for " + queueName);
+ }
+ ParentQueue parentQueue =
+ new ParentQueue(csContext, queueName, parent,
+ oldQueues.get(queueName));
+
+ // Used only for unit tests
+ queue = hook.hook(parentQueue);
+
+ List<CSQueue> childQueues = new ArrayList<>();
+ for (String childQueueName : childQueueNames) {
+ CSQueue childQueue =
+ parseQueue(csContext, conf, queue, childQueueName,
+ queues, oldQueues, hook);
+ childQueues.add(childQueue);
+ }
+ parentQueue.setChildQueues(childQueues);
+ }
+
+ if (queue instanceof LeafQueue && queues.containsKey(queueName)
+ && queues.get(queueName) instanceof LeafQueue) {
+ throw new IOException("Two leaf queues were named " + queueName
+ + ". Leaf queue names must be distinct");
+ }
+ queues.put(queueName, queue);
+
+ LOG.info("Initialized queue: " + queue);
+ return queue;
+ }
+
+ /**
+ * Ensure all existing queues are present. Queues cannot be deleted
+ * @param queues existing queues
+ * @param newQueues new queues
+ */
+ private void validateExistingQueues(
+ Map<String, CSQueue> queues, Map<String, CSQueue> newQueues)
+ throws IOException {
+ // check that all static queues are included in the newQueues list
+ for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
+ if (!(e.getValue() instanceof ReservationQueue)) {
+ String queueName = e.getKey();
+ CSQueue oldQueue = e.getValue();
+ CSQueue newQueue = newQueues.get(queueName);
+ if (null == newQueue) {
+ throw new IOException(queueName + " cannot be found during refresh!");
+ } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) {
+ throw new IOException(queueName + " is moved from:"
+ + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath()
+ + " after refresh, which is not allowed.");
+ }
+ }
+ }
+ }
+
+ /**
+ * Add the new queues (only) to our list of queues...
+ * ... be careful, do not overwrite existing queues.
+ * @param queues the existing queues
+ * @param newQueues the new queues
+ */
+ private void addNewQueues(
+ Map<String, CSQueue> queues, Map<String, CSQueue> newQueues) {
+ for (Map.Entry<String, CSQueue> e : newQueues.entrySet()) {
+ String queueName = e.getKey();
+ CSQueue queue = e.getValue();
+ if (!queues.containsKey(queueName)) {
+ queues.put(queueName, queue);
+ }
+ }
+ }
+
+ @VisibleForTesting
+ /**
+ * Set the acls for the queues.
+ * @param authorizer the yarnAuthorizationProvider
+ * @param queues the queues
+ * @throws IOException if fails to set queue acls
+ */
+ public static void setQueueAcls(YarnAuthorizationProvider authorizer,
+ Map<String, CSQueue> queues) throws IOException {
+ List<Permission> permissions = new ArrayList<>();
+ for (CSQueue queue : queues.values()) {
+ AbstractCSQueue csQueue = (AbstractCSQueue) queue;
+ permissions.add(
+ new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
+ }
+ authorizer.setPermission(permissions,
+ UserGroupInformation.getCurrentUser());
+ }
+
+ /**
+ * Check that the String provided in input is the name of an existing,
+ * LeafQueue, if successful returns the queue.
+ *
+ * @param queue the queue name
+ * @return the LeafQueue
+ * @throws YarnException if the queue does not exist or the queue
+ * is not the type of LeafQueue.
+ */
+ public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+ CSQueue ret = this.getQueue(queue);
+ if (ret == null) {
+ throw new YarnException("The specified Queue: " + queue
+ + " doesn't exist");
+ }
+ if (!(ret instanceof LeafQueue)) {
+ throw new YarnException("The specified Queue: " + queue
+ + " is not a Leaf Queue.");
+ }
+ return (LeafQueue) ret;
+ }
+
+ /**
+ * Get the default priority of the queue.
+ * @param queueName the queue name
+ * @return the default priority of the queue
+ */
+ public Priority getDefaultPriorityForQueue(String queueName) {
+ Queue queue = getQueue(queueName);
+ if (null == queue || null == queue.getDefaultApplicationPriority()) {
+ // Return with default application priority
+ return Priority.newInstance(CapacitySchedulerConfiguration
+ .DEFAULT_CONFIGURATION_APPLICATION_PRIORITY);
+ }
+ return Priority.newInstance(queue.getDefaultApplicationPriority()
+ .getPriority());
+ }
+
+ /**
+ * Get a map of queueToLabels.
+ * @return the map of queueToLabels
+ */
+ private Map<String, Set<String>> getQueueToLabels() {
+ Map<String, Set<String>> queueToLabels = new HashMap<>();
+ for (CSQueue queue : getQueues().values()) {
+ queueToLabels.put(queue.getQueueName(), queue.getAccessibleNodeLabels());
+ }
+ return queueToLabels;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
index 11e94b9..7382f3d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java
@@ -111,7 +111,8 @@ public class TestApplicationLimits {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(10 * 16 * GB, 10 * 32));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -123,9 +124,9 @@ public class TestApplicationLimits {
containerTokenSecretManager);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
- CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues,
+ CSQueue root = CapacitySchedulerQueueManager
+ .parseQueue(csContext, csConf, null, "root",
+ queues, queues,
TestUtils.spyHook);
@@ -276,7 +277,8 @@ public class TestApplicationLimits {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 16));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -288,8 +290,8 @@ public class TestApplicationLimits {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ "root", queues, queues, TestUtils.spyHook);
LeafQueue queue = (LeafQueue)queues.get(A);
@@ -356,9 +358,9 @@ public class TestApplicationLimits {
+ ".maximum-am-resource-percent", 0.5f);
// Re-create queues to get new configs.
queues = new HashMap<String, CSQueue>();
- root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ root = CapacitySchedulerQueueManager.parseQueue(
+ csContext, csConf, null, "root",
+ queues, queues, TestUtils.spyHook);
clusterResource = Resources.createResource(100 * 16 * GB);
queue = (LeafQueue)queues.get(A);
@@ -378,9 +380,9 @@ public class TestApplicationLimits {
9999);
// Re-create queues to get new configs.
queues = new HashMap<String, CSQueue>();
- root =
- CapacityScheduler.parseQueue(csContext, csConf, null, "root",
- queues, queues, TestUtils.spyHook);
+ root = CapacitySchedulerQueueManager.parseQueue(
+ csContext, csConf, null, "root",
+ queues, queues, TestUtils.spyHook);
queue = (LeafQueue)queues.get(A);
assertEquals(9999, (int)csConf.getMaximumApplicationsPerQueue(queue.getQueuePath()));
@@ -580,7 +582,8 @@ public class TestApplicationLimits {
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -589,8 +592,8 @@ public class TestApplicationLimits {
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
- CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
- "root", queues, queues, TestUtils.spyHook);
+ CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
+ csConf, null, "root", queues, queues, TestUtils.spyHook);
ResourceUsage queueCapacities = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
index d335552..5c53fda 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimitsByPartition.java
@@ -595,7 +595,8 @@ public class TestApplicationLimitsByPartition {
when(csContext.getMaximumResourceCapability())
.thenReturn(Resources.createResource(16 * GB));
when(csContext.getNonPartitionedQueueComparator())
- .thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ .thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
RMContext rmContext = TestUtils.getMockRMContext();
RMContext spyRMContext = spy(rmContext);
@@ -614,8 +615,8 @@ public class TestApplicationLimitsByPartition {
when(csContext.getClusterResource()).thenReturn(clusterResource);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
- CSQueue rootQueue = CapacityScheduler.parseQueue(csContext, csConf, null,
- "root", queues, queues, TestUtils.spyHook);
+ CSQueue rootQueue = CapacitySchedulerQueueManager.parseQueue(csContext,
+ csConf, null, "root", queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = rootQueue.getQueueResourceUsage();
when(csContext.getClusterResourceUsage())
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
index 40e5d2a..a6ae0c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestChildQueueOrder.java
@@ -95,11 +95,12 @@ public class TestChildQueueOrder {
when(csContext.getMaximumResourceCapability()).thenReturn(
Resources.createResource(16*GB, 32));
when(csContext.getClusterResource()).
- thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
+ thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
- thenReturn(resourceComparator);
+ thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
}
@@ -222,7 +223,7 @@ public class TestChildQueueOrder {
setupSortedQueues(csConf);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
index 8694efb..2ce5fcb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java
@@ -175,7 +175,8 @@ public class TestLeafQueue {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).
thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
@@ -188,7 +189,7 @@ public class TestLeafQueue {
containerTokenSecretManager);
root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
queues, queues,
TestUtils.spyHook);
@@ -2380,7 +2381,7 @@ public class TestLeafQueue {
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
@@ -2405,7 +2406,7 @@ public class TestLeafQueue {
.NODE_LOCALITY_DELAY, 60);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
index d875969..a36db44 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java
@@ -97,10 +97,11 @@ public class TestParentQueue {
when(csContext.getClusterResource()).
thenReturn(Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getNonPartitionedQueueComparator()).
- thenReturn(CapacityScheduler.nonPartitionedQueueComparator);
+ thenReturn(
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getResourceCalculator()).
- thenReturn(resourceComparator);
+ thenReturn(resourceComparator);
when(csContext.getRMContext()).thenReturn(rmContext);
}
@@ -231,7 +232,7 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@@ -346,7 +347,7 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
boolean exceptionOccured = false;
try {
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@@ -360,7 +361,7 @@ public class TestParentQueue {
exceptionOccured = false;
queues.clear();
try {
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@@ -374,7 +375,7 @@ public class TestParentQueue {
exceptionOccured = false;
queues.clear();
try {
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
@@ -467,7 +468,7 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@@ -623,8 +624,8 @@ public class TestParentQueue {
csConf.setCapacity(Q_B + "." + B3, 0);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
- CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, queues, queues,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@@ -640,8 +641,8 @@ public class TestParentQueue {
csConf.setCapacity(Q_A, 60);
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
- CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, queues, queues,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
}
@@ -662,8 +663,8 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
try {
- CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, queues, queues,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException e) {
fail("Failed to create queues with 0 capacity: " + e);
@@ -678,7 +679,7 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
@@ -754,8 +755,8 @@ public class TestParentQueue {
//B3
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, queues, queues,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
// Setup some nodes
@@ -850,12 +851,12 @@ public class TestParentQueue {
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
CSQueue root =
- CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, queues, queues,
+ CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
+ CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
YarnAuthorizationProvider authorizer =
YarnAuthorizationProvider.getInstance(conf);
- CapacityScheduler.setQueueAcls(authorizer, queues);
+ CapacitySchedulerQueueManager.setQueueAcls(authorizer, queues);
UserGroupInformation user = UserGroupInformation.getCurrentUser();
// Setup queue configs
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
index f6caa50..3e05456 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java
@@ -134,7 +134,7 @@ public class TestReservations {
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 12));
when(csContext.getNonPartitionedQueueComparator()).thenReturn(
- CapacityScheduler.nonPartitionedQueueComparator);
+ CapacitySchedulerQueueManager.NON_PARTITIONED_QUEUE_COMPARATOR);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
when(csContext.getRMContext()).thenReturn(rmContext);
@@ -144,7 +144,7 @@ public class TestReservations {
when(csContext.getContainerTokenSecretManager()).thenReturn(
containerTokenSecretManager);
- root = CapacityScheduler.parseQueue(csContext, csConf, null,
+ root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues, TestUtils.spyHook);
ResourceUsage queueResUsage = root.getQueueResourceUsage();
@@ -1180,8 +1180,8 @@ public class TestReservations {
csConf.setBoolean(
CapacitySchedulerConfiguration.RESERVE_CONT_LOOK_ALL_NODES, false);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
- CSQueue newRoot = CapacityScheduler.parseQueue(csContext, csConf, null,
- CapacitySchedulerConfiguration.ROOT, newQueues, queues,
+ CSQueue newRoot = CapacitySchedulerQueueManager.parseQueue(csContext,
+ csConf, null, CapacitySchedulerConfiguration.ROOT, newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResource());
http://git-wip-us.apache.org/repos/asf/hadoop/blob/69fb70c3/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
index e34ee34..b982fab 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUtils.java
@@ -141,7 +141,7 @@ public class TestUtils {
/**
* Hook to spy on queues.
*/
- static class SpyHook extends CapacityScheduler.QueueHook {
+ static class SpyHook extends CapacitySchedulerQueueManager.QueueHook {
@Override
public CSQueue hook(CSQueue queue) {
return spy(queue);
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org