You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2017/12/08 23:23:42 UTC
[5/9] hadoop git commit: YARN-7473. Implement Framework and policy
for capacity management of auto created queues. (Suma Shivaprasad via wangda)
YARN-7473. Implement Framework and policy for capacity management of auto created queues. (Suma Shivaprasad via wangda)
Change-Id: Icca7805fe12f6f7fb335effff4b121b6f7f6337b
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/b38643c9
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/b38643c9
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/b38643c9
Branch: refs/heads/trunk
Commit: b38643c9a8dd2c53024ae830b9565a550d0ec39c
Parents: 74665e3
Author: Wangda Tan <wa...@apache.org>
Authored: Fri Dec 8 15:10:16 2017 -0800
Committer: Wangda Tan <wa...@apache.org>
Committed: Fri Dec 8 15:10:16 2017 -0800
----------------------------------------------------------------------
.../monitor/SchedulingMonitor.java | 4 +-
.../ProportionalCapacityPreemptionPolicy.java | 7 +-
.../monitor/capacity/TempQueuePerPartition.java | 8 +
.../CapacitySchedulerPlanFollower.java | 12 +-
.../scheduler/AbstractYarnScheduler.java | 2 +-
.../scheduler/YarnScheduler.java | 2 +-
.../capacity/AbstractAutoCreatedLeafQueue.java | 113 +++
.../scheduler/capacity/AbstractCSQueue.java | 101 ++-
.../capacity/AbstractManagedParentQueue.java | 162 ++--
.../capacity/AutoCreatedLeafQueue.java | 136 ++--
.../capacity/AutoCreatedLeafQueueConfig.java | 66 ++
.../AutoCreatedQueueManagementPolicy.java | 64 ++
.../scheduler/capacity/CSQueue.java | 2 +-
.../scheduler/capacity/CSQueueUtils.java | 33 +-
.../scheduler/capacity/CapacityScheduler.java | 104 +--
.../CapacitySchedulerConfiguration.java | 112 ++-
.../capacity/CapacitySchedulerContext.java | 8 +
.../capacity/CapacitySchedulerQueueManager.java | 6 +-
.../scheduler/capacity/LeafQueue.java | 75 +-
.../scheduler/capacity/ManagedParentQueue.java | 294 +++++++-
.../scheduler/capacity/PlanQueue.java | 150 +++-
.../capacity/QueueManagementChange.java | 148 ++++
.../QueueManagementDynamicEditPolicy.java | 272 +++++++
.../scheduler/capacity/ReservationQueue.java | 91 +++
.../GuaranteedOrZeroCapacityOverTimePolicy.java | 745 +++++++++++++++++++
.../scheduler/common/QueueEntitlement.java | 22 +
.../event/QueueManagementChangeEvent.java | 49 ++
.../scheduler/event/SchedulerEventType.java | 5 +-
.../capacity/TestAutoCreatedLeafQueue.java | 113 ---
...stCapacitySchedulerAutoCreatedQueueBase.java | 579 ++++++++++++++
.../TestCapacitySchedulerAutoQueueCreation.java | 611 ++++++---------
.../TestCapacitySchedulerDynamicBehavior.java | 32 +-
...tGuaranteedOrZeroCapacityOverTimePolicy.java | 40 +
.../scheduler/capacity/TestLeafQueue.java | 125 +++-
.../TestQueueManagementDynamicEditPolicy.java | 121 +++
.../capacity/TestReservationQueue.java | 114 +++
36 files changed, 3629 insertions(+), 899 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
index 09edb98..d1cc850 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/SchedulingMonitor.java
@@ -79,7 +79,7 @@ public class SchedulingMonitor extends AbstractService {
}
private void schedulePreemptionChecker() {
- handler = ses.scheduleAtFixedRate(new PreemptionChecker(),
+ handler = ses.scheduleAtFixedRate(new PolicyInvoker(),
0, monitorInterval, TimeUnit.MILLISECONDS);
}
@@ -99,7 +99,7 @@ public class SchedulingMonitor extends AbstractService {
scheduleEditPolicy.editSchedule();
}
- private class PreemptionChecker implements Runnable {
+ private class PolicyInvoker implements Runnable {
@Override
public void run() {
try {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
index 8327cb9..304d204 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/ProportionalCapacityPreemptionPolicy.java
@@ -37,6 +37,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuot
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+ .ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptableQueue;
@@ -377,7 +380,9 @@ public class ProportionalCapacityPreemptionPolicy
}
private Set<String> getLeafQueueNames(TempQueuePerPartition q) {
- if (q.children == null || q.children.isEmpty()) {
+ // If its a ManagedParentQueue, it might not have any children
+ if ((q.children == null || q.children.isEmpty())
+ && !(q.parentQueue instanceof ManagedParentQueue)) {
return ImmutableSet.of(q.queueName);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 4d71223..fdeee52 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -21,6 +21,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+ .ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -56,6 +59,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
final ArrayList<TempQueuePerPartition> children;
private Collection<TempAppPerPartition> apps;
LeafQueue leafQueue;
+ ParentQueue parentQueue;
boolean preemptionDisabled;
protected Resource pendingDeductReserved;
@@ -90,6 +94,10 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
pendingDeductReserved = Resources.createResource(0);
}
+ if (ParentQueue.class.isAssignableFrom(queue.getClass())) {
+ parentQueue = (ParentQueue) queue;
+ }
+
this.normalizedGuarantee = new double[ResourceUtils
.getNumberOfKnownResourceTypes()];
this.children = new ArrayList<>();
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
index 2e16689..7962d8e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/CapacitySchedulerPlanFollower.java
@@ -28,10 +28,12 @@ import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.PlanQueue;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
+ .ReservationQueue;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
@@ -92,8 +94,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower
String planQueueName, Queue queue, String currResId) {
PlanQueue planQueue = (PlanQueue)queue;
try {
- AutoCreatedLeafQueue resQueue =
- new AutoCreatedLeafQueue(cs, currResId, planQueue);
+ ReservationQueue resQueue =
+ new ReservationQueue(cs, currResId, planQueue);
cs.addQueue(resQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
@@ -112,8 +114,8 @@ public class CapacitySchedulerPlanFollower extends AbstractSchedulerPlanFollower
PlanQueue planQueue = (PlanQueue)queue;
if (cs.getQueue(defReservationId) == null) {
try {
- AutoCreatedLeafQueue defQueue =
- new AutoCreatedLeafQueue(cs, defReservationId, planQueue);
+ ReservationQueue defQueue =
+ new ReservationQueue(cs, defReservationId, planQueue);
cs.addQueue(defQueue);
} catch (SchedulerDynamicEditException e) {
LOG.warn(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index d94efb1..cf5e13b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -457,7 +457,7 @@ public abstract class AbstractYarnScheduler
}
@Override
- public void addQueue(Queue newQueue) throws YarnException {
+ public void addQueue(Queue newQueue) throws YarnException, IOException {
throw new YarnException(getClass().getSimpleName()
+ " does not support this operation");
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 111998b..93ca7c2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -272,7 +272,7 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @param newQueue the queue being added.
* @throws YarnException
*/
- void addQueue(Queue newQueue) throws YarnException;
+ void addQueue(Queue newQueue) throws YarnException, IOException;
/**
* This method increase the entitlement for current queue (must respect
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
new file mode 100644
index 0000000..ac97d72
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler
+ .SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
+ .QueueEntitlement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
+ .NO_LABEL;
+
+/**
+ * Abstract class for dynamic auto created queues managed by an implementation
+ * of AbstractManagedParentQueue
+ */
+public class AbstractAutoCreatedLeafQueue extends LeafQueue {
+
+ protected AbstractManagedParentQueue parent;
+
+ public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
+ String queueName, AbstractManagedParentQueue parent, CSQueue old)
+ throws IOException {
+ super(cs, queueName, parent, old);
+ this.parent = parent;
+ }
+
+ private static final Logger LOG = LoggerFactory.getLogger(
+ AbstractAutoCreatedLeafQueue.class);
+
+ public AbstractAutoCreatedLeafQueue(CapacitySchedulerContext cs,
+ CapacitySchedulerConfiguration leafQueueConfigs, String queueName,
+ AbstractManagedParentQueue parent, CSQueue old) throws IOException {
+ super(cs, leafQueueConfigs, queueName, parent, old);
+ this.parent = parent;
+ }
+
+ /**
+ * This methods to change capacity for a queue and adjusts its
+ * absoluteCapacity
+ *
+ * @param entitlement the new entitlement for the queue (capacity,
+ * maxCapacity, etc..)
+ * @throws SchedulerDynamicEditException
+ */
+ public void setEntitlement(QueueEntitlement entitlement)
+ throws SchedulerDynamicEditException {
+ setEntitlement(NO_LABEL, entitlement);
+ }
+
+ /**
+ * This methods to change capacity for a queue and adjusts its
+ * absoluteCapacity
+ *
+ * @param entitlement the new entitlement for the queue (capacity,
+ * maxCapacity, etc..)
+ * @throws SchedulerDynamicEditException
+ */
+ public void setEntitlement(String nodeLabel, QueueEntitlement entitlement)
+ throws SchedulerDynamicEditException {
+ try {
+ writeLock.lock();
+ float capacity = entitlement.getCapacity();
+ if (capacity < 0 || capacity > 1.0f) {
+ throw new SchedulerDynamicEditException(
+ "Capacity demand is not in the [0,1] range: " + capacity);
+ }
+ setCapacity(nodeLabel, capacity);
+ setAbsoluteCapacity(nodeLabel,
+ getParent().getQueueCapacities().
+ getAbsoluteCapacity(nodeLabel)
+ * getQueueCapacities().getCapacity(nodeLabel));
+ // note: we currently set maxCapacity to capacity
+ // this might be revised later
+ setMaxCapacity(nodeLabel, entitlement.getMaxCapacity());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("successfully changed to " + capacity + " for queue " + this
+ .getQueueName());
+ }
+
+ //update queue used capacity etc
+ CSQueueUtils.updateQueueStatistics(resourceCalculator,
+ csContext.getClusterResource(),
+ this, labelManager, nodeLabel);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ protected void setupConfigurableCapacities(QueueCapacities queueCapacities) {
+ CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
+ queueCapacities, parent == null ? null : parent.getQueueCapacities());
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 140ea5d..4df4cf2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -128,27 +128,34 @@ public abstract class AbstractCSQueue implements CSQueue {
public AbstractCSQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
+ this(cs, cs.getConfiguration(), queueName, parent, old);
+ }
+
+ public AbstractCSQueue(CapacitySchedulerContext cs,
+ CapacitySchedulerConfiguration configuration, String queueName,
+ CSQueue parent, CSQueue old) {
+
this.labelManager = cs.getRMContext().getNodeLabelManager();
this.parent = parent;
this.queueName = queueName;
- this.queuePath =
- ((parent == null) ? "" : (parent.getQueuePath() + ".")) + this.queueName;
+ this.queuePath = ((parent == null) ? "" : (parent.getQueuePath() + "."))
+ + this.queueName;
this.resourceCalculator = cs.getResourceCalculator();
this.activitiesManager = cs.getActivitiesManager();
-
+
// must be called after parent and queueName is set
- this.metrics =
- old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics
- .forQueue(getQueuePath(), parent, cs.getConfiguration()
- .getEnableUserMetrics(), cs.getConf());
+ this.metrics = old != null ?
+ (CSQueueMetrics) old.getMetrics() :
+ CSQueueMetrics.forQueue(getQueuePath(), parent,
+ configuration.getEnableUserMetrics(), cs.getConf());
this.csContext = cs;
this.minimumAllocation = csContext.getMinimumResourceCapability();
-
+
// initialize ResourceUsage
queueUsage = new ResourceUsage();
queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath());
-
+
// initialize QueueCapacities
queueCapacities = new QueueCapacities(parent == null);
@@ -159,11 +166,16 @@ public abstract class AbstractCSQueue implements CSQueue {
readLock = lock.readLock();
writeLock = lock.writeLock();
}
-
+
protected void setupConfigurableCapacities() {
+ setupConfigurableCapacities(csContext.getConfiguration());
+ }
+
+ protected void setupConfigurableCapacities(
+ CapacitySchedulerConfiguration configuration) {
CSQueueUtils.loadUpdateAndCheckCapacities(
getQueuePath(),
- csContext.getConfiguration(),
+ configuration,
queueCapacities,
parent == null ? null : parent.getQueueCapacities());
}
@@ -275,6 +287,29 @@ public abstract class AbstractCSQueue implements CSQueue {
}
}
+ /**
+ * Set maximum capacity
+ * @param maximumCapacity new max capacity
+ */
+ void setMaxCapacity(String nodeLabel, float maximumCapacity) {
+ try {
+ writeLock.lock();
+ // Sanity check
+ CSQueueUtils.checkMaxCapacity(getQueueName(),
+ queueCapacities.getCapacity(nodeLabel), maximumCapacity);
+ float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
+ maximumCapacity, parent);
+ CSQueueUtils.checkAbsoluteCapacity(getQueueName(),
+ queueCapacities.getAbsoluteCapacity(nodeLabel), absMaxCapacity);
+
+ queueCapacities.setMaximumCapacity(maximumCapacity);
+ queueCapacities.setAbsoluteMaximumCapacity(absMaxCapacity);
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+
@Override
public String getDefaultNodeLabelExpression() {
return defaultLabelExpression;
@@ -282,13 +317,20 @@ public abstract class AbstractCSQueue implements CSQueue {
void setupQueueConfigs(Resource clusterResource)
throws IOException {
+ setupQueueConfigs(clusterResource, csContext.getConfiguration());
+ }
+
+ protected void setupQueueConfigs(Resource clusterResource,
+ CapacitySchedulerConfiguration configuration) throws
+ IOException {
+
try {
writeLock.lock();
// get labels
this.accessibleLabels =
- csContext.getConfiguration().getAccessibleNodeLabels(getQueuePath());
+ configuration.getAccessibleNodeLabels(getQueuePath());
this.defaultLabelExpression =
- csContext.getConfiguration().getDefaultNodeLabelExpression(
+ configuration.getDefaultNodeLabelExpression(
getQueuePath());
this.resourceTypes = new HashSet<String>();
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
@@ -308,7 +350,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
// After we setup labels, we can setup capacities
- setupConfigurableCapacities();
+ setupConfigurableCapacities(configuration);
// Also fetch minimum/maximum resource constraint for this queue if
// configured.
@@ -316,20 +358,20 @@ public abstract class AbstractCSQueue implements CSQueue {
updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
this.maximumAllocation =
- csContext.getConfiguration().getMaximumAllocationPerQueue(
+ configuration.getMaximumAllocationPerQueue(
getQueuePath());
// initialized the queue state based on previous state, configured state
// and its parent state.
QueueState previous = getState();
- QueueState configuredState = csContext.getConfiguration()
+ QueueState configuredState = configuration
.getConfiguredState(getQueuePath());
QueueState parentState = (parent == null) ? null : parent.getState();
initializeQueueState(previous, configuredState, parentState);
authorizer = YarnAuthorizationProvider.getInstance(csContext.getConf());
- this.acls = csContext.getConfiguration().getAcls(getQueuePath());
+ this.acls = configuration.getAcls(getQueuePath());
// Update metrics
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
@@ -361,18 +403,21 @@ public abstract class AbstractCSQueue implements CSQueue {
this.reservationsContinueLooking =
csContext.getConfiguration().getReservationContinueLook();
- this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this);
+ this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
+ configuration);
- this.priority = csContext.getConfiguration().getQueuePriority(
+ this.priority = configuration.getQueuePriority(
getQueuePath());
- this.userWeights = getUserWeightsFromHierarchy();
+ this.userWeights = getUserWeightsFromHierarchy(configuration);
} finally {
writeLock.unlock();
}
}
- private Map<String, Float> getUserWeightsFromHierarchy() throws IOException {
+ private Map<String, Float> getUserWeightsFromHierarchy
+ (CapacitySchedulerConfiguration configuration) throws
+ IOException {
Map<String, Float> unionInheritedWeights = new HashMap<String, Float>();
CSQueue parentQ = getParent();
if (parentQ != null) {
@@ -381,9 +426,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
// Insert this queue's user's weights, overriding parent's user's weights if
// there is overlap.
- CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
unionInheritedWeights.putAll(
- csConf.getAllUserWeightsForQueue(getQueuePath()));
+ configuration.getAllUserWeightsForQueue(getQueuePath()));
return unionInheritedWeights;
}
@@ -720,10 +764,11 @@ public abstract class AbstractCSQueue implements CSQueue {
*
* @return true if queue has preemption disabled, false otherwise
*/
- private boolean isQueueHierarchyPreemptionDisabled(CSQueue q) {
- CapacitySchedulerConfiguration csConf = csContext.getConfiguration();
+ private boolean isQueueHierarchyPreemptionDisabled(CSQueue q,
+ CapacitySchedulerConfiguration configuration) {
boolean systemWidePreemption =
- csConf.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
+ csContext.getConfiguration()
+ .getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
CSQueue parentQ = q.getParent();
@@ -735,14 +780,14 @@ public abstract class AbstractCSQueue implements CSQueue {
// on, then q does not have preemption disabled (default=false, below)
// unless the preemption_disabled property is explicitly set.
if (parentQ == null) {
- return csConf.getPreemptionDisabled(q.getQueuePath(), false);
+ return configuration.getPreemptionDisabled(q.getQueuePath(), false);
}
// If this is not the root queue, inherit the default value for the
// preemption_disabled property from the parent. Preemptability will be
// inherited from the parent's hierarchy unless explicitly overridden at
// this level.
- return csConf.getPreemptionDisabled(q.getQueuePath(),
+ return configuration.getPreemptionDisabled(q.getQueuePath(),
parentQ.getPreemptionDisabled());
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
index 46f5cf1..9d38f79 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java
@@ -17,13 +17,21 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
+ .QueueEntitlement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
+import java.util.Comparator;
import java.util.Iterator;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
/**
* A container class for automatically created child leaf queues.
@@ -35,13 +43,12 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
private static final Logger LOG = LoggerFactory.getLogger(
AbstractManagedParentQueue.class);
- protected AutoCreatedLeafQueueTemplate leafQueueTemplate;
+ protected AutoCreatedLeafQueueConfig leafQueueTemplate;
+ protected AutoCreatedQueueManagementPolicy queueManagementPolicy = null;
public AbstractManagedParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
-
- super.setupQueueConfigs(csContext.getClusterResource());
}
@Override
@@ -53,52 +60,18 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
// Set new configs
setupQueueConfigs(clusterResource);
- // run reinitialize on each existing queue, to trigger absolute cap
- // recomputations
- for (CSQueue res : this.getChildQueues()) {
- res.reinitialize(res, clusterResource);
- }
} finally {
writeLock.unlock();
}
}
/**
- * Initialize leaf queue configs from template configurations specified on
- * parent queue.
- */
- protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs
- (String queuePath) {
-
- CapacitySchedulerConfiguration conf = csContext.getConfiguration();
-
- AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new
- AutoCreatedLeafQueueTemplate.Builder();
- int maxApps = conf.getMaximumApplicationsPerQueue(queuePath);
- if (maxApps < 0) {
- maxApps = (int) (
- CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS
- * getAbsoluteCapacity());
- }
-
- int userLimit = conf.getUserLimit(queuePath);
- float userLimitFactor = conf.getUserLimitFactor(queuePath);
- leafQueueTemplateBuilder.userLimit(userLimit)
- .userLimitFactor(userLimitFactor)
- .maxApps(maxApps)
- .maxAppsPerUser(
- (int) (maxApps * (userLimit / 100.0f) * userLimitFactor));
-
- return leafQueueTemplateBuilder;
- }
-
- /**
* Add the specified child queue.
* @param childQueue reference to the child queue to be added
* @throws SchedulerDynamicEditException
*/
public void addChildQueue(CSQueue childQueue)
- throws SchedulerDynamicEditException {
+ throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
if (childQueue.getCapacity() > 0) {
@@ -193,84 +166,69 @@ public abstract class AbstractManagedParentQueue extends ParentQueue {
}
}
- public static class AutoCreatedLeafQueueTemplate {
-
- private QueueCapacities queueCapacities;
-
- private int maxApps;
- private int maxAppsPerUser;
- private int userLimit;
- private float userLimitFactor;
-
- AutoCreatedLeafQueueTemplate(Builder builder) {
- this.maxApps = builder.maxApps;
- this.maxAppsPerUser = builder.maxAppsPerUser;
- this.userLimit = builder.userLimit;
- this.userLimitFactor = builder.userLimitFactor;
- this.queueCapacities = builder.queueCapacities;
- }
-
- public static class Builder {
- private int maxApps;
- private int maxAppsPerUser;
+ public AutoCreatedLeafQueueConfig getLeafQueueTemplate() {
+ return leafQueueTemplate;
+ }
- private int userLimit;
- private float userLimitFactor;
+ public AutoCreatedQueueManagementPolicy
+ getAutoCreatedQueueManagementPolicy() {
+ return queueManagementPolicy;
+ }
- private QueueCapacities queueCapacities;
+ protected SortedMap<String, String> getConfigurationsWithPrefix
+ (SortedMap<String, String> sortedConfigs, String prefix) {
+ return sortedConfigs.subMap( prefix, prefix + Character.MAX_VALUE );
+ }
- Builder maxApps(int maxApplications) {
- this.maxApps = maxApplications;
- return this;
- }
+ protected SortedMap<String, String> sortCSConfigurations() {
+ SortedMap<String, String> sortedConfigs = new TreeMap(
+ new Comparator<String>() {
+ public int compare(String s1, String s2) {
+ return s1.compareToIgnoreCase(s2);
+ }
- Builder maxAppsPerUser(int maxApplicationsPerUser) {
- this.maxAppsPerUser = maxApplicationsPerUser;
- return this;
- }
+ });
- Builder userLimit(int usrLimit) {
- this.userLimit = usrLimit;
- return this;
- }
+ for (final Iterator<Map.Entry<String, String>> iterator =
+ csContext.getConfiguration().iterator(); iterator.hasNext(); ) {
+ final Map.Entry<String, String> confKeyValuePair = iterator.next();
+ sortedConfigs.put(confKeyValuePair.getKey(), confKeyValuePair.getValue());
+ }
+ return sortedConfigs;
+ }
- Builder userLimitFactor(float ulf) {
- this.userLimitFactor = ulf;
- return this;
- }
+ protected CapacitySchedulerConfiguration initializeLeafQueueConfigs(String
+ configPrefix) {
- Builder capacities(QueueCapacities capacities) {
- this.queueCapacities = capacities;
- return this;
- }
+ CapacitySchedulerConfiguration leafQueueConfigs = new
+ CapacitySchedulerConfiguration(new Configuration(false), false);
- AutoCreatedLeafQueueTemplate build() {
- return new AutoCreatedLeafQueueTemplate(this);
- }
- }
+ SortedMap<String, String> sortedConfigs = sortCSConfigurations();
+ SortedMap<String, String> templateConfigs = getConfigurationsWithPrefix
+ (sortedConfigs, configPrefix);
- public int getUserLimit() {
- return userLimit;
+ for (final Iterator<Map.Entry<String, String>> iterator =
+ templateConfigs.entrySet().iterator(); iterator.hasNext(); ) {
+ Map.Entry<String, String> confKeyValuePair = iterator.next();
+ leafQueueConfigs.set(confKeyValuePair.getKey(),
+ confKeyValuePair.getValue());
}
- public float getUserLimitFactor() {
- return userLimitFactor;
- }
+ return leafQueueConfigs;
+ }
- public QueueCapacities getQueueCapacities() {
- return queueCapacities;
- }
+ protected void validateQueueEntitlementChange(AbstractAutoCreatedLeafQueue
+ leafQueue, QueueEntitlement entitlement)
+ throws SchedulerDynamicEditException {
- public int getMaxApps() {
- return maxApps;
- }
+ float sumChilds = sumOfChildCapacities();
+ float newChildCap =
+ sumChilds - leafQueue.getCapacity() + entitlement.getCapacity();
- public int getMaxAppsPerUser() {
- return maxAppsPerUser;
+ if (!(newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON)) {
+ throw new SchedulerDynamicEditException(
+ "Sum of child queues should exceed 100% for auto creating parent "
+ + "queue : " + queueName);
}
}
-
- public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() {
- return leafQueueTemplate;
- }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index bc206d4..1d796ad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -21,36 +21,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
/**
- * Leaf queues which are auto created by an underkying implementation of
+ * Leaf queues which are auto created by an underlying implementation of
* AbstractManagedParentQueue. Eg: PlanQueue for reservations or
* ManagedParentQueue for auto created dynamic queues
*/
-public class AutoCreatedLeafQueue extends LeafQueue {
+public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
private static final Logger LOG = LoggerFactory
.getLogger(AutoCreatedLeafQueue.class);
- private AbstractManagedParentQueue parent;
-
public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
- AbstractManagedParentQueue parent) throws IOException {
- super(cs, queueName, parent, null);
-
- AutoCreatedLeafQueueTemplate leafQueueTemplate =
- parent.getLeafQueueTemplate();
- updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
- leafQueueTemplate.getUserLimitFactor(),
- leafQueueTemplate.getMaxApps(),
- leafQueueTemplate.getMaxAppsPerUser());
- this.parent = parent;
+ ManagedParentQueue parent) throws IOException {
+ super(cs, parent.getLeafQueueConfigs(queueName),
+ queueName,
+ parent, null);
+ updateCapacitiesToZero();
}
@Override
@@ -61,48 +52,75 @@ public class AutoCreatedLeafQueue extends LeafQueue {
validate(newlyParsedQueue);
- super.reinitialize(newlyParsedQueue, clusterResource);
- CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
- this, labelManager, null);
+ ManagedParentQueue managedParentQueue = (ManagedParentQueue) parent;
- AutoCreatedLeafQueueTemplate leafQueueTemplate =
- parent.getLeafQueueTemplate();
- updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(),
- leafQueueTemplate.getUserLimitFactor(),
- leafQueueTemplate.getMaxApps(),
- leafQueueTemplate.getMaxAppsPerUser());
+ super.reinitialize(newlyParsedQueue, clusterResource, managedParentQueue
+ .getLeafQueueConfigs(newlyParsedQueue.getQueueName()));
+
+ //Reset capacities to 0 since reinitialize above
+ // queueCapacities to initialize to configured capacity which might
+ // overcommit resources from parent queue
+ updateCapacitiesToZero();
} finally {
writeLock.unlock();
}
}
- /**
- * This methods to change capacity for a queue and adjusts its
- * absoluteCapacity.
- *
- * @param entitlement the new entitlement for the queue (capacity,
- * maxCapacity)
- * @throws SchedulerDynamicEditException
- */
- public void setEntitlement(QueueEntitlement entitlement)
- throws SchedulerDynamicEditException {
+ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
+ leafQueueTemplate) throws SchedulerDynamicEditException, IOException {
+
try {
writeLock.lock();
- float capacity = entitlement.getCapacity();
+
+ // TODO:
+ // reinitialize only capacities for now since 0 capacity updates
+ // can cause
+ // abs capacity related config computations to be incorrect if we go
+ // through reinitialize
+ QueueCapacities capacities = leafQueueTemplate.getQueueCapacities();
+
+ //update abs capacities
+ setupConfigurableCapacities(capacities);
+
+ //reset capacities for the leaf queue
+ mergeCapacities(capacities);
+
+ //update queue used capacity for all the node labels
+ CSQueueUtils.updateQueueStatistics(resourceCalculator,
+ csContext.getClusterResource(),
+ this, labelManager, null);
+
+ //activate applications if any are pending
+ activateApplications();
+
+ } finally {
+ writeLock.unlock();
+ }
+ }
+
+ private void mergeCapacities(QueueCapacities capacities) {
+ for ( String nodeLabel : capacities.getExistingNodeLabels()) {
+ this.queueCapacities.setCapacity(nodeLabel,
+ capacities.getCapacity(nodeLabel));
+ this.queueCapacities.setAbsoluteCapacity(nodeLabel, capacities
+ .getAbsoluteCapacity(nodeLabel));
+ this.queueCapacities.setMaximumCapacity(nodeLabel, capacities
+ .getMaximumCapacity(nodeLabel));
+ this.queueCapacities.setAbsoluteMaximumCapacity(nodeLabel, capacities
+ .getAbsoluteMaximumCapacity(nodeLabel));
+ }
+ }
+
+ public void validateConfigurations(AutoCreatedLeafQueueConfig template)
+ throws SchedulerDynamicEditException {
+ QueueCapacities capacities = template.getQueueCapacities();
+ for (String label : capacities.getExistingNodeLabels()) {
+ float capacity = capacities.getCapacity(label);
if (capacity < 0 || capacity > 1.0f) {
throw new SchedulerDynamicEditException(
"Capacity demand is not in the [0,1] range: " + capacity);
}
- setCapacity(capacity);
- setAbsoluteCapacity(getParent().getAbsoluteCapacity() * getCapacity());
- setMaxCapacity(entitlement.getMaxCapacity());
- if (LOG.isDebugEnabled()) {
- LOG.debug("successfully changed to " + capacity + " for queue " + this
- .getQueueName());
- }
- } finally {
- writeLock.unlock();
}
}
@@ -113,22 +131,20 @@ public class AutoCreatedLeafQueue extends LeafQueue {
"Error trying to reinitialize " + getQueuePath() + " from "
+ newlyParsedQueue.getQueuePath());
}
-
- }
-
- @Override
- protected void setupConfigurableCapacities() {
- CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
- queueCapacities, parent == null ? null : parent.getQueueCapacities());
}
- private void updateApplicationAndUserLimits(int userLimit,
- float userLimitFactor,
- int maxAppsForAutoCreatedQueues,
- int maxAppsPerUserForAutoCreatedQueues) {
- setUserLimit(userLimit);
- setUserLimitFactor(userLimitFactor);
- setMaxApplications(maxAppsForAutoCreatedQueues);
- setMaxApplicationsPerUser(maxAppsPerUserForAutoCreatedQueues);
+ private void updateCapacitiesToZero() throws IOException {
+ try {
+ for( String nodeLabel : parent.getQueueCapacities().getExistingNodeLabels
+ ()) {
+ //TODO - update to use getMaximumCapacity(nodeLabel) in YARN-7574
+ setEntitlement(nodeLabel, new QueueEntitlement(0.0f,
+ parent.getLeafQueueTemplate()
+ .getQueueCapacities()
+ .getMaximumCapacity()));
+ }
+ } catch (SchedulerDynamicEditException e) {
+ throw new IOException(e);
+ }
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java
new file mode 100644
index 0000000..5952250
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueueConfig.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+/**
+ * Auto Created Leaf queue configurations, capacity
+ */
+public class AutoCreatedLeafQueueConfig {
+
+ /**
+ * Template queue capacities - contains configured and derived capacities
+ * like abs capacity which are used by auto queue creation policy to manage
+ * leaf queue capacities
+ */
+ private QueueCapacities queueCapacities;
+
+ private CapacitySchedulerConfiguration leafQueueConfigs;
+
+ public AutoCreatedLeafQueueConfig(Builder builder) {
+ this.queueCapacities = builder.queueCapacities;
+ this.leafQueueConfigs = builder.leafQueueConfigs;
+ }
+
+ public static class Builder {
+
+ private QueueCapacities queueCapacities;
+ private CapacitySchedulerConfiguration leafQueueConfigs;
+
+ public Builder capacities(QueueCapacities capacities) {
+ this.queueCapacities = capacities;
+ return this;
+ }
+
+ public Builder configuration(CapacitySchedulerConfiguration conf) {
+ this.leafQueueConfigs = conf;
+ return this;
+ }
+
+ public AutoCreatedLeafQueueConfig build() {
+ return new AutoCreatedLeafQueueConfig(this);
+ }
+ }
+
+ public QueueCapacities getQueueCapacities() {
+ return queueCapacities;
+ }
+
+ public CapacitySchedulerConfiguration getLeafQueueConfigs() {
+ return leafQueueConfigs;
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
new file mode 100644
index 0000000..f7a4bbd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedQueueManagementPolicy.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
+
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import java.util.List;
+
+public interface AutoCreatedQueueManagementPolicy {
+
+ /**
+ * Initialize policy
+ * @param schedulerContext Capacity Scheduler context
+ */
+ void init(CapacitySchedulerContext schedulerContext, ParentQueue parentQueue);
+
+ /**
+ * Reinitialize policy state ( if required )
+ * @param schedulerContext Capacity Scheduler context
+ */
+ void reinitialize(CapacitySchedulerContext schedulerContext,
+ ParentQueue parentQueue);
+
+ /**
+ * Get initial template for the specified leaf queue
+ * @param leafQueue the leaf queue
+ * @return initial leaf queue template configurations and capacities for
+ * auto created queue
+ */
+ AutoCreatedLeafQueueConfig getInitialLeafQueueConfiguration(
+ AbstractAutoCreatedLeafQueue leafQueue)
+ throws SchedulerDynamicEditException;
+
+ /**
+ * Compute/Adjust child queue capacities
+ * for auto created leaf queues
+ *
+ * @return returns a list of suggested QueueEntitlementChange(s) which may
+ * or may not be be enforced by the scheduler
+ */
+ List<QueueManagementChange> computeQueueManagementChanges()
+ throws SchedulerDynamicEditException;
+
+ /**
+ * Commit/Update state for the specified queue management changes.
+ */
+ void commitQueueManagementChanges(
+ List<QueueManagementChange> queueManagementChanges)
+ throws SchedulerDynamicEditException;
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 6d79b6a..5dd307c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -230,7 +230,7 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
* @param newlyParsedQueue new queue to re-initalize from
* @param clusterResource resources in the cluster
*/
- public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
+ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
throws IOException;
/**
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
index 51e5b17..3901398 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java
@@ -29,9 +29,9 @@ import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
-class CSQueueUtils {
+public class CSQueueUtils {
- final static float EPSILON = 0.0001f;
+ public final static float EPSILON = 0.0001f;
/*
* Used only by tests
@@ -123,12 +123,12 @@ class CSQueueUtils {
for (String label : configuredNodelabels) {
if (label.equals(CommonNodeLabelsManager.NO_LABEL)) {
- queueCapacities.setCapacity(CommonNodeLabelsManager.NO_LABEL,
+ queueCapacities.setCapacity(label,
csConf.getNonLabeledQueueCapacity(queuePath) / 100);
- queueCapacities.setMaximumCapacity(CommonNodeLabelsManager.NO_LABEL,
+ queueCapacities.setMaximumCapacity(label,
csConf.getNonLabeledQueueMaximumCapacity(queuePath) / 100);
queueCapacities.setMaxAMResourcePercentage(
- CommonNodeLabelsManager.NO_LABEL,
+ label,
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
} else {
queueCapacities.setCapacity(label,
@@ -183,9 +183,32 @@ class CSQueueUtils {
if (Resources.greaterThan(rc, totalPartitionResource,
totalPartitionResource, Resources.none())) {
+
Resource queueGuranteedResource = childQueue
.getEffectiveCapacity(nodePartition);
+ //TODO : Modify below code to support Absolute Resource configurations
+ // (YARN-5881) for AutoCreatedLeafQueue
+ if (Float.compare(queueCapacities.getAbsoluteCapacity
+ (nodePartition), 0f) == 0
+ && childQueue instanceof AutoCreatedLeafQueue) {
+
+ //If absolute capacity is 0 for a leaf queue (could be a managed leaf
+ // queue, then use the leaf queue's template capacity to compute
+ // guaranteed resource for used capacity)
+
+ // queueGuaranteed = totalPartitionedResource *
+ // absolute_capacity(partition)
+ ManagedParentQueue parentQueue = (ManagedParentQueue)
+ childQueue.getParent();
+ QueueCapacities leafQueueTemplateCapacities = parentQueue
+ .getLeafQueueTemplate()
+ .getQueueCapacities();
+ queueGuranteedResource = Resources.multiply(totalPartitionResource,
+ leafQueueTemplateCapacities.getAbsoluteCapacity
+ (nodePartition));
+ }
+
// make queueGuranteed >= minimum_allocation to avoid divided by 0.
queueGuranteedResource =
Resources.max(rc, totalPartitionResource, queueGuranteedResource,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index de93a6a..a5efd9f 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -70,6 +70,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
+
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -129,6 +130,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemoved
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
+ .QueueManagementChangeEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -138,6 +141,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleC
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.Lock;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -175,6 +179,8 @@ public class CapacityScheduler extends
private CSConfigurationProvider csConfProvider;
+ protected Clock monotonicClock;
+
@Override
public void setConf(Configuration conf) {
yarnConf = conf;
@@ -1501,7 +1507,7 @@ public class CapacityScheduler extends
{
NodeLabelsUpdateSchedulerEvent labelUpdateEvent =
(NodeLabelsUpdateSchedulerEvent) event;
-
+
updateNodeLabelsAndQueueResource(labelUpdateEvent);
}
break;
@@ -1613,6 +1619,25 @@ public class CapacityScheduler extends
}
}
break;
+ case MANAGE_QUEUE:
+ {
+ QueueManagementChangeEvent queueManagementChangeEvent =
+ (QueueManagementChangeEvent) event;
+ ParentQueue parentQueue = queueManagementChangeEvent.getParentQueue();
+ try {
+ final List<QueueManagementChange> queueManagementChanges =
+ queueManagementChangeEvent.getQueueManagementChanges();
+ ((ManagedParentQueue) parentQueue)
+ .validateAndApplyQueueManagementChanges(queueManagementChanges);
+ } catch (SchedulerDynamicEditException sde) {
+ LOG.error("Queue Management Change event cannot be applied for "
+ + "parent queue : " + parentQueue.getQueueName(), sde);
+ } catch (IOException ioe) {
+ LOG.error("Queue Management Change event cannot be applied for "
+ + "parent queue : " + parentQueue.getQueueName(), ioe);
+ }
+ }
+ break;
default:
LOG.error("Invalid eventtype " + event.getType() + ". Ignoring!");
}
@@ -1976,12 +2001,14 @@ public class CapacityScheduler extends
writeLock.lock();
LOG.info("Removing queue: " + queueName);
CSQueue q = this.getQueue(queueName);
- if (!(q instanceof AutoCreatedLeafQueue)) {
+ if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
+ q.getClass()))) {
throw new SchedulerDynamicEditException(
"The queue that we are asked " + "to remove (" + queueName
- + ") is not a AutoCreatedLeafQueue");
+ + ") is not a AutoCreatedLeafQueue or ReservationQueue");
}
- AutoCreatedLeafQueue disposableLeafQueue = (AutoCreatedLeafQueue) q;
+ AbstractAutoCreatedLeafQueue disposableLeafQueue =
+ (AbstractAutoCreatedLeafQueue) q;
// at this point we should have no more apps
if (disposableLeafQueue.getNumApplications() > 0) {
throw new SchedulerDynamicEditException(
@@ -1994,8 +2021,8 @@ public class CapacityScheduler extends
((AbstractManagedParentQueue) disposableLeafQueue.getParent())
.removeChildQueue(q);
this.queueManager.removeQueue(queueName);
- LOG.info("Removal of AutoCreatedLeafQueue "
- + queueName + " has succeeded");
+ LOG.info(
+ "Removal of AutoCreatedLeafQueue " + queueName + " has succeeded");
} finally {
writeLock.unlock();
}
@@ -2003,22 +2030,27 @@ public class CapacityScheduler extends
@Override
public void addQueue(Queue queue)
- throws SchedulerDynamicEditException {
+ throws SchedulerDynamicEditException, IOException {
try {
writeLock.lock();
- if (!(queue instanceof AutoCreatedLeafQueue)) {
+ if (queue == null) {
+ throw new SchedulerDynamicEditException(
+ "Queue specified is null. Should be an implementation of "
+ + "AbstractAutoCreatedLeafQueue");
+ } else if (!(AbstractAutoCreatedLeafQueue.class
+ .isAssignableFrom(queue.getClass()))) {
throw new SchedulerDynamicEditException(
- "Queue " + queue.getQueueName() + " is not a AutoCreatedLeafQueue");
+ "Queue is not an implementation of "
+ + "AbstractAutoCreatedLeafQueue : " + queue.getClass());
}
- AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
+ AbstractAutoCreatedLeafQueue newQueue =
+ (AbstractAutoCreatedLeafQueue) queue;
- if (newQueue.getParent() == null
- || !(AbstractManagedParentQueue.class.
+ if (newQueue.getParent() == null || !(AbstractManagedParentQueue.class.
isAssignableFrom(newQueue.getParent().getClass()))) {
throw new SchedulerDynamicEditException(
- "ParentQueue for " + newQueue.getQueueName()
- + " is not properly set"
+ "ParentQueue for " + newQueue + " is not properly set"
+ " (should be set and be a PlanQueue or ManagedParentQueue)");
}
@@ -2027,6 +2059,7 @@ public class CapacityScheduler extends
String queuename = newQueue.getQueueName();
parentPlan.addChildQueue(newQueue);
this.queueManager.addQueue(queuename, newQueue);
+
LOG.info("Creation of AutoCreatedLeafQueue " + newQueue + " succeeded");
} finally {
writeLock.unlock();
@@ -2039,48 +2072,32 @@ public class CapacityScheduler extends
try {
writeLock.lock();
LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
- AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue
- .getParent();
+ AbstractManagedParentQueue parent =
+ (AbstractManagedParentQueue) queue.getParent();
- if (!(queue instanceof AutoCreatedLeafQueue)) {
+ if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(
+ queue.getClass()))) {
throw new SchedulerDynamicEditException(
"Entitlement can not be" + " modified dynamically since queue "
+ inQueue + " is not a AutoCreatedLeafQueue");
}
- if (parent == null
- || !(AbstractManagedParentQueue.class.isAssignableFrom(
- parent.getClass()))) {
+ if (parent == null || !(AbstractManagedParentQueue.class.isAssignableFrom(
+ parent.getClass()))) {
throw new SchedulerDynamicEditException(
"The parent of AutoCreatedLeafQueue " + inQueue
+ " must be a PlanQueue/ManagedParentQueue");
}
- AutoCreatedLeafQueue newQueue = (AutoCreatedLeafQueue) queue;
+ AbstractAutoCreatedLeafQueue newQueue =
+ (AbstractAutoCreatedLeafQueue) queue;
+ parent.validateQueueEntitlementChange(newQueue, entitlement);
- float sumChilds = parent.sumOfChildCapacities();
- float newChildCap =
- sumChilds - queue.getCapacity() + entitlement.getCapacity();
+ newQueue.setEntitlement(entitlement);
- if (newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON) {
- // note: epsilon checks here are not ok, as the epsilons might
- // accumulate and become a problem in aggregate
- if (Math.abs(entitlement.getCapacity() - queue.getCapacity()) == 0
- && Math.abs(
- entitlement.getMaxCapacity() - queue.getMaximumCapacity()) == 0) {
- return;
- }
- newQueue.setEntitlement(entitlement);
- } else{
- throw new SchedulerDynamicEditException(
- "Sum of child queues should exceed 100% for auto creating parent "
- + "queue : " + parent.getQueueName());
- }
- LOG.info(
- "Set entitlement for AutoCreatedLeafQueue " + inQueue
- + " to " + queue.getCapacity() +
- " request was (" + entitlement.getCapacity()
- + ")");
+ LOG.info("Set entitlement for AutoCreatedLeafQueue " + inQueue + " to "
+ + queue.getCapacity() + " request was (" + entitlement.getCapacity()
+ + ")");
} finally {
writeLock.unlock();
}
@@ -2718,7 +2735,6 @@ public class CapacityScheduler extends
addQueue(autoCreatedLeafQueue);
- //TODO - Set entitlement through capacity management policy
} else{
throw new SchedulerDynamicEditException(
"Could not auto-create leaf queue for " + leafQueueName
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
index a33d81a..8aa41ee 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java
@@ -923,6 +923,11 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return getInt(NODE_LOCALITY_DELAY, DEFAULT_NODE_LOCALITY_DELAY);
}
+ @VisibleForTesting
+ public void setNodeLocalityDelay(int nodeLocalityDelay) {
+ setInt(NODE_LOCALITY_DELAY, nodeLocalityDelay);
+ }
+
public int getRackLocalityAdditionalDelay() {
return getInt(RACK_LOCALITY_ADDITIONAL_DELAY,
DEFAULT_RACK_LOCALITY_ADDITIONAL_DELAY);
@@ -1401,6 +1406,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return maxApplicationsPerQueue;
}
+ public void setGlobalMaximumApplicationsPerQueue(int val) {
+ setInt(QUEUE_GLOBAL_MAX_APPLICATION, val);
+ }
+
/**
* Ordering policy inside a parent queue to sort queues
*/
@@ -1621,8 +1630,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false;
@Private
+ private static final String AUTO_CREATE_CHILD_QUEUE_PREFIX =
+ "auto-create-child-queue.";
+
+ @Private
public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED =
- "auto-create-child-queue.enabled";
+ AUTO_CREATE_CHILD_QUEUE_PREFIX + "enabled";
@Private
public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX =
@@ -1722,8 +1735,83 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
}
@Private
+ public static final String AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
+ AUTO_CREATE_CHILD_QUEUE_PREFIX + "management-policy";
+
+ @Private
+ public static final String DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY =
+ "org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity"
+ + ".queuemanagement."
+ + "GuaranteedOrZeroCapacityOverTimePolicy";
+
+ @Private
+ private static final String QUEUE_MANAGEMENT_CONFIG_PREFIX =
+ "yarn.resourcemanager.monitor.capacity.queue-management.";
+
+ /**
+ * Time in milliseconds between invocations of this policy
+ */
+ @Private
+ public static final String QUEUE_MANAGEMENT_MONITORING_INTERVAL =
+ QUEUE_MANAGEMENT_CONFIG_PREFIX + "monitoring-interval";
+
+ @Private
+ public static final long DEFAULT_QUEUE_MANAGEMENT_MONITORING_INTERVAL =
+ 1500L;
+
+ /**
+ * Queue Management computation policy for Auto Created queues
+ * @param queue The queue's path
+ * @return Configured policy class name
+ */
+ @Private
+ public String getAutoCreatedQueueManagementPolicy(String queue) {
+ String autoCreatedQueueManagementPolicy =
+ get(getQueuePrefix(queue) + AUTO_CREATED_QUEUE_MANAGEMENT_POLICY,
+ DEFAULT_AUTO_CREATED_QUEUE_MANAGEMENT_POLICY);
+ return autoCreatedQueueManagementPolicy;
+ }
+
+ /**
+ * Get The policy class configured to manage capacities for auto created leaf
+ * queues under the specified parent
+ *
+ * @param queueName The parent queue's name
+ * @return The policy class configured to manage capacities for auto created
+ * leaf queues under the specified parent queue
+ */
+ @Private
+ protected AutoCreatedQueueManagementPolicy
+ getAutoCreatedQueueManagementPolicyClass(
+ String queueName) {
+
+ String queueManagementPolicyClassName =
+ getAutoCreatedQueueManagementPolicy(queueName);
+ LOG.info("Using Auto Created Queue Management Policy: "
+ + queueManagementPolicyClassName + " for queue: " + queueName);
+ try {
+ Class<?> queueManagementPolicyClazz = getClassByName(
+ queueManagementPolicyClassName);
+ if (AutoCreatedQueueManagementPolicy.class.isAssignableFrom(
+ queueManagementPolicyClazz)) {
+ return (AutoCreatedQueueManagementPolicy) ReflectionUtils.newInstance(
+ queueManagementPolicyClazz, this);
+ } else{
+ throw new YarnRuntimeException(
+ "Class: " + queueManagementPolicyClassName + " not instance of "
+ + AutoCreatedQueueManagementPolicy.class.getCanonicalName());
+ }
+ } catch (ClassNotFoundException e) {
+ throw new YarnRuntimeException(
+ "Could not instantiate " + "AutoCreatedQueueManagementPolicy: "
+ + queueManagementPolicyClassName + " for queue: " + queueName,
+ e);
+ }
+ }
+
@VisibleForTesting
- public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath,
+ @Private
+ public void setAutoCreatedLeafQueueConfigCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
@@ -1732,13 +1820,31 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
@VisibleForTesting
- public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
+ public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
setMaximumCapacity(leafQueueConfPrefix, val);
}
+ @VisibleForTesting
+ @Private
+ public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
+ int val) {
+ String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+ queuePath);
+ setUserLimit(leafQueueConfPrefix, val);
+ }
+
+ @VisibleForTesting
+ @Private
+ public void setAutoCreatedLeafQueueConfigUserLimitFactor(String queuePath,
+ float val) {
+ String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
+ queuePath);
+ setUserLimitFactor(leafQueueConfPrefix, val);
+ }
+
public static String getUnits(String resourceValue) {
String units;
for (int i = 0; i < resourceValue.length(); i++) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
index 7c918a5..ae74989 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerContext.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preempti
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
+import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
/**
@@ -94,4 +95,11 @@ public interface CapacitySchedulerContext {
* @return if configuration is mutable
*/
boolean isConfigurationMutable();
+
+ /**
+ * Get clock from scheduler
+ * @return Clock
+ */
+ Clock getClock();
+
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index eb50123..30ecd40 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -239,7 +239,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
queueName + ReservationConstants.DEFAULT_QUEUE_SUFFIX;
List<CSQueue> childQueues = new ArrayList<>();
- AutoCreatedLeafQueue resQueue = new AutoCreatedLeafQueue(csContext,
+ ReservationQueue resQueue = new ReservationQueue(csContext,
defReservationId, (PlanQueue) queue);
try {
resQueue.setEntitlement(new QueueEntitlement(1.0f, 1.0f));
@@ -312,7 +312,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
Map<String, CSQueue> newQueues) throws IOException {
// check that all static queues are included in the newQueues list
for (Map.Entry<String, CSQueue> e : queues.entrySet()) {
- if (!(e.getValue() instanceof AutoCreatedLeafQueue)) {
+ if (!(AbstractAutoCreatedLeafQueue.class.isAssignableFrom(e.getValue()
+ .getClass()))) {
String queueName = e.getKey();
CSQueue oldQueue = e.getValue();
CSQueue newQueue = newQueues.get(queueName);
@@ -394,7 +395,6 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
String queueName = e.getKey();
CSQueue existingQueue = e.getValue();
- //TODO - Handle case when auto create is disabled on parent queues
if (!newQueues.containsKey(queueName) && !(
existingQueue instanceof AutoCreatedLeafQueue && conf
.isAutoCreateChildQueueEnabled(
http://git-wip-us.apache.org/repos/asf/hadoop/blob/b38643c9/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 41ec4ba..86fcbc9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -138,7 +138,14 @@ public class LeafQueue extends AbstractCSQueue {
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
- super(cs, queueName, parent, old);
+ this(cs, cs.getConfiguration(), queueName, parent, old);
+ }
+
+ public LeafQueue(CapacitySchedulerContext cs,
+ CapacitySchedulerConfiguration configuration,
+ String queueName, CSQueue parent, CSQueue old) throws
+ IOException {
+ super(cs, configuration, queueName, parent, old);
this.scheduler = cs;
this.usersManager = new UsersManager(metrics, this, labelManager, scheduler,
@@ -149,17 +156,25 @@ public class LeafQueue extends AbstractCSQueue {
if(LOG.isDebugEnabled()) {
LOG.debug("LeafQueue:" + " name=" + queueName
- + ", fullname=" + getQueuePath());
+ + ", fullname=" + getQueuePath());
}
- setupQueueConfigs(cs.getClusterResource());
+ setupQueueConfigs(cs.getClusterResource(), configuration);
+
}
protected void setupQueueConfigs(Resource clusterResource)
throws IOException {
+ setupQueueConfigs(clusterResource, csContext.getConfiguration());
+ }
+
+ protected void setupQueueConfigs(Resource clusterResource,
+ CapacitySchedulerConfiguration conf) throws
+ IOException {
try {
writeLock.lock();
- super.setupQueueConfigs(clusterResource);
+ CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
+ super.setupQueueConfigs(clusterResource, conf);
this.lastClusterResource = clusterResource;
@@ -173,8 +188,6 @@ public class LeafQueue extends AbstractCSQueue {
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
setQueueResourceLimitsInfo(clusterResource);
- CapacitySchedulerConfiguration conf = csContext.getConfiguration();
-
setOrderingPolicy(
conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
@@ -183,11 +196,13 @@ public class LeafQueue extends AbstractCSQueue {
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
- int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
+ int maxGlobalPerQueueApps = schedConf
+ .getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
} else {
- int maxSystemApps = conf.getMaximumSystemApplications();
+ int maxSystemApps = schedConf.
+ getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
@@ -218,9 +233,11 @@ public class LeafQueue extends AbstractCSQueue {
.join(getAccessibleNodeLabels().iterator(), ',')));
}
- nodeLocalityDelay = conf.getNodeLocalityDelay();
- rackLocalityAdditionalDelay = conf.getRackLocalityAdditionalDelay();
- rackLocalityFullReset = conf.getRackLocalityFullReset();
+ nodeLocalityDelay = schedConf.getNodeLocalityDelay();
+ rackLocalityAdditionalDelay = schedConf
+ .getRackLocalityAdditionalDelay();
+ rackLocalityFullReset = schedConf
+ .getRackLocalityFullReset();
// re-init this since max allocation could have changed
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
@@ -507,10 +524,11 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- @Override
- public void reinitialize(
- CSQueue newlyParsedQueue, Resource clusterResource)
- throws IOException {
+ protected void reinitialize(
+ CSQueue newlyParsedQueue, Resource clusterResource,
+ CapacitySchedulerConfiguration configuration) throws
+ IOException {
+
try {
writeLock.lock();
// Sanity check
@@ -535,7 +553,7 @@ public class LeafQueue extends AbstractCSQueue {
+ newMax);
}
- setupQueueConfigs(clusterResource);
+ setupQueueConfigs(clusterResource, configuration);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@@ -547,6 +565,14 @@ public class LeafQueue extends AbstractCSQueue {
}
@Override
+ public void reinitialize(
+ CSQueue newlyParsedQueue, Resource clusterResource)
+ throws IOException {
+ reinitialize(newlyParsedQueue, clusterResource,
+ csContext.getConfiguration());
+ }
+
+ @Override
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName) {
// Careful! Locking order is important!
@@ -731,7 +757,7 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- private void activateApplications() {
+ protected void activateApplications() {
try {
writeLock.lock();
// limit of allowed resource usage for application masters
@@ -1991,10 +2017,18 @@ public class LeafQueue extends AbstractCSQueue {
queueCapacities.setCapacity(capacity);
}
+ public void setCapacity(String nodeLabel, float capacity) {
+ queueCapacities.setCapacity(nodeLabel, capacity);
+ }
+
public void setAbsoluteCapacity(float absoluteCapacity) {
queueCapacities.setAbsoluteCapacity(absoluteCapacity);
}
+ public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) {
+ queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity);
+ }
+
public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
this.maxApplicationsPerUser = maxApplicationsPerUser;
}
@@ -2002,7 +2036,12 @@ public class LeafQueue extends AbstractCSQueue {
public void setMaxApplications(int maxApplications) {
this.maxApplications = maxApplications;
}
-
+
+ public void setMaxAMResourcePerQueuePercent(
+ float maxAMResourcePerQueuePercent) {
+ this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
+ }
+
public OrderingPolicy<FiCaSchedulerApp>
getOrderingPolicy() {
return orderingPolicy;
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org