You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by sn...@apache.org on 2021/12/13 15:32:27 UTC
[hadoop] branch trunk updated: YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke
This is an automated email from the ASF dual-hosted git repository.
snemeth pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new 898055e YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke
898055e is described below
commit 898055e204acfe54ab5a8cffb8f9769aab6fe7a7
Author: Szilard Nemeth <sn...@apache.org>
AuthorDate: Mon Dec 13 16:12:55 2021 +0100
YARN-11024. Create an AbstractLeafQueue to store the common LeafQueue + AutoCreatedLeafQueue functionality. Contributed by Benjamin Teke
---
.../monitor/capacity/FifoCandidatesSelector.java | 4 +-
.../capacity/FifoIntraQueuePreemptionPlugin.java | 4 +-
.../capacity/IntraQueueCandidatesSelector.java | 13 +-
.../monitor/capacity/TempQueuePerPartition.java | 10 +-
.../placement/CSMappingPlacementRule.java | 4 +-
.../placement/QueuePlacementRuleUtils.java | 6 +-
.../MappingRuleValidationContextImpl.java | 6 +-
.../capacity/AbstractAutoCreatedLeafQueue.java | 2 +-
.../{LeafQueue.java => AbstractLeafQueue.java} | 722 +++---
.../scheduler/capacity/AutoCreatedLeafQueue.java | 6 +-
.../capacity/CSMaxRunningAppsEnforcer.java | 8 +-
.../capacity/CapacityHeadroomProvider.java | 8 +-
.../scheduler/capacity/CapacityScheduler.java | 44 +-
.../capacity/CapacitySchedulerConfigValidator.java | 4 +-
.../capacity/CapacitySchedulerQueueManager.java | 14 +-
.../scheduler/capacity/LeafQueue.java | 2436 +-------------------
.../scheduler/capacity/ManagedParentQueue.java | 6 +-
.../scheduler/capacity/ParentQueue.java | 8 +-
.../scheduler/capacity/ReservationQueue.java | 5 +
.../scheduler/capacity/UsersManager.java | 4 +-
.../GuaranteedOrZeroCapacityOverTimePolicy.java | 50 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 10 +-
.../webapp/dao/CapacitySchedulerInfo.java | 8 +-
.../webapp/dao/CapacitySchedulerLeafQueueInfo.java | 4 +-
.../dao/helper/CapacitySchedulerInfoHelper.java | 4 +-
.../TestWorkPreservingRMRestart.java | 5 +-
.../capacity/TestCSMaxRunningAppsEnforcer.java | 2 +-
.../TestCapacitySchedulerNewQueueAutoCreation.java | 15 +-
28 files changed, 436 insertions(+), 2976 deletions(-)
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
index d9e9091..440066c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoCandidatesSelector.java
@@ -24,7 +24,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
@@ -86,7 +86,7 @@ public class FifoCandidatesSelector
}
// compute resToObtainByPartition considered inter-queue preemption
- LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+ AbstractLeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).leafQueue;
Map<String, Resource> resToObtainByPartition =
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
index ea17fed..188e619 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/FifoIntraQueuePreemptionPlugin.java
@@ -40,7 +40,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.IntraQueue
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FairOrderingPolicy;
@@ -577,7 +577,7 @@ public class FifoIntraQueuePreemptionPlugin
}
private Resource calculateUsedAMResourcesPerQueue(String partition,
- LeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
+ AbstractLeafQueue leafQueue, Map<String, Resource> perUserAMUsed) {
Collection<FiCaSchedulerApp> runningApps = leafQueue.getApplications();
Resource amUsed = Resources.createResource(0, 0);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
index cea1bca..f0bd03b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/IntraQueueCandidatesSelector.java
@@ -26,7 +26,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy.IntraQueuePreemptionOrderPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.AbstractComparatorOrderingPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@@ -146,7 +146,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
// 4. Iterate from most under-served queue in order.
for (String queueName : queueNames) {
- LeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
+ AbstractLeafQueue leafQueue = preemptionContext.getQueueByPartition(queueName,
RMNodeLabelsManager.NO_LABEL).leafQueue;
// skip if not a leafqueue
@@ -181,7 +181,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
leafQueue.getReadLock().lock();
try {
for (FiCaSchedulerApp app : apps) {
- preemptFromLeastStarvedApp(leafQueue, app, selectedCandidates,
+ preemptFromLeastStarvedApp(app, selectedCandidates,
curCandidates, clusterResource, totalPreemptedResourceAllowed,
resToObtainByPartition, rollingResourceUsagePerUser);
}
@@ -195,7 +195,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
}
private void initializeUsageAndUserLimitForCompute(Resource clusterResource,
- String partition, LeafQueue leafQueue,
+ String partition, AbstractLeafQueue leafQueue,
Map<String, Resource> rollingResourceUsagePerUser) {
for (String user : leafQueue.getAllUsers()) {
// Initialize used resource of a given user for rolling computation.
@@ -206,8 +206,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
}
}
- private void preemptFromLeastStarvedApp(LeafQueue leafQueue,
- FiCaSchedulerApp app,
+ private void preemptFromLeastStarvedApp(FiCaSchedulerApp app,
Map<ApplicationAttemptId, Set<RMContainer>> selectedCandidates,
Map<ApplicationAttemptId, Set<RMContainer>> curCandidates,
Resource clusterResource, Resource totalPreemptedResourceAllowed,
@@ -293,7 +292,7 @@ public class IntraQueueCandidatesSelector extends PreemptionCandidatesSelector {
for (String queueName : queueNames) {
TempQueuePerPartition tq = context.getQueueByPartition(queueName,
partition);
- LeafQueue leafQueue = tq.leafQueue;
+ AbstractLeafQueue leafQueue = tq.leafQueue;
// skip if its parent queue
if (null == leafQueue) {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
index 57dc639..958c08e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/capacity/TempQueuePerPartition.java
@@ -26,7 +26,7 @@ import java.util.Map;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
@@ -56,7 +56,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
final ArrayList<TempQueuePerPartition> children;
private Collection<TempAppPerPartition> apps;
- LeafQueue leafQueue;
+ AbstractLeafQueue leafQueue;
ParentQueue parentQueue;
boolean preemptionDisabled;
@@ -81,8 +81,8 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
super(queueName, current, Resource.newInstance(0, 0), reserved,
Resource.newInstance(0, 0));
- if (queue instanceof LeafQueue) {
- LeafQueue l = (LeafQueue) queue;
+ if (queue instanceof AbstractLeafQueue) {
+ AbstractLeafQueue l = (AbstractLeafQueue) queue;
pending = l.getTotalPendingResourcesConsideringUserLimit(
totalPartitionResource, partition, false);
pendingDeductReserved = l.getTotalPendingResourcesConsideringUserLimit(
@@ -113,7 +113,7 @@ public class TempQueuePerPartition extends AbstractPreemptionEntity {
this.effMaxRes = effMaxRes;
}
- public void setLeafQueue(LeafQueue l) {
+ public void setLeafQueue(AbstractLeafQueue l) {
assert children.size() == 0;
this.leafQueue = l;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
index d9c7e6f..cefed1d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java
@@ -32,7 +32,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Capacity
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -277,7 +277,7 @@ public class CSMappingPlacementRule extends PlacementRule {
}
CSQueue queue = queueManager.getQueueByFullName(normalizedName);
- if (queue != null && !(queue instanceof LeafQueue)) {
+ if (queue != null && !(queue instanceof AbstractLeafQueue)) {
throw new YarnException("Mapping rule returned a non-leaf queue '" +
normalizedName + "', cannot place application in it.");
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
index 76e3e27..f6381bd 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/QueuePlacementRuleUtils.java
@@ -21,13 +21,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import java.io.IOException;
-import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
-
/**
* Utility class for Capacity Scheduler queue PlacementRules.
*/
@@ -83,7 +81,7 @@ public final class QueuePlacementRuleUtils {
public static QueueMapping validateAndGetQueueMapping(
CapacitySchedulerQueueManager queueManager, CSQueue queue,
QueueMapping mapping) throws IOException {
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
throw new IOException(
"mapping contains invalid or non-leaf queue : " +
mapping.getFullPath());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
index 4218b6f..cceb7e6 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/csmappingrule/MappingRuleValidationContextImpl.java
@@ -22,7 +22,7 @@ import org.apache.hadoop.util.Sets;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
@@ -91,7 +91,7 @@ public class MappingRuleValidationContextImpl
"' under it.");
case QUEUE_EXISTS:
CSQueue queue = queueManager.getQueue(normalizedPath);
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
throw new YarnException("Target queue '" + path.getFullPath() +
"' but it's not a leaf queue.");
}
@@ -157,7 +157,7 @@ public class MappingRuleValidationContextImpl
//if the static part of our queue exists, and it's not a leaf queue,
//we cannot do any deeper validation
if (queue != null) {
- if (queue instanceof LeafQueue) {
+ if (queue instanceof AbstractLeafQueue) {
throw new YarnException("Queue path '" + path +"' is invalid " +
"because '" + normalizedStaticPart + "' is a leaf queue, " +
"which can have no other queues under it.");
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
index b9c2ec6..36d2aef 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractAutoCreatedLeafQueue.java
@@ -34,7 +34,7 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
* Abstract class for dynamic auto created queues managed by an implementation
* of AbstractManagedParentQueue
*/
-public class AbstractAutoCreatedLeafQueue extends LeafQueue {
+public class AbstractAutoCreatedLeafQueue extends AbstractLeafQueue {
protected AbstractManagedParentQueue parent;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
similarity index 81%
copy from hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
copy to hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
index 4592f2a..9991140 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java
@@ -26,7 +26,6 @@ import java.util.concurrent.ConcurrentMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
-import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.security.authorize.AccessControlList;
@@ -78,18 +77,16 @@ import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-@Private
-@Unstable
-public class LeafQueue extends AbstractCSQueue {
+public class AbstractLeafQueue extends AbstractCSQueue {
private static final Logger LOG =
- LoggerFactory.getLogger(LeafQueue.class);
+ LoggerFactory.getLogger(AbstractLeafQueue.class);
private float absoluteUsedCapacity = 0.0f;
// TODO the max applications should consider label
protected int maxApplications;
protected volatile int maxApplicationsPerUser;
-
+
private float maxAMResourcePerQueuePercent;
private volatile int nodeLocalityDelay;
@@ -105,8 +102,8 @@ public class LeafQueue extends AbstractCSQueue {
private volatile float minimumAllocationFactor;
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
+ private final RecordFactory recordFactory =
+ RecordFactoryProvider.getRecordFactory(null);
private final UsersManager usersManager;
@@ -142,21 +139,18 @@ public class LeafQueue extends AbstractCSQueue {
private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
@SuppressWarnings({ "unchecked", "rawtypes" })
- public LeafQueue(CapacitySchedulerContext cs,
- String queueName, CSQueue parent, CSQueue old) throws IOException {
+ public AbstractLeafQueue(CapacitySchedulerContext cs, String queueName,
+ CSQueue parent, CSQueue old) {
this(cs, cs.getConfiguration(), queueName, parent, old, false);
}
- public LeafQueue(CapacitySchedulerContext cs,
- CapacitySchedulerConfiguration configuration,
- String queueName, CSQueue parent, CSQueue old) throws IOException {
+ public AbstractLeafQueue(CapacitySchedulerContext cs, CapacitySchedulerConfiguration configuration,
+ String queueName, CSQueue parent, CSQueue old) {
this(cs, configuration, queueName, parent, old, false);
}
- public LeafQueue(CapacitySchedulerContext cs,
- CapacitySchedulerConfiguration configuration,
- String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
- IOException {
+ public AbstractLeafQueue(CapacitySchedulerContext cs, CapacitySchedulerConfiguration configuration,
+ String queueName, CSQueue parent, CSQueue old, boolean isDynamic) {
super(cs, configuration, queueName, parent, old);
setDynamicQueue(isDynamic);
@@ -165,17 +159,11 @@ public class LeafQueue extends AbstractCSQueue {
// One time initialization is enough since it is static ordering policy
this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
-
- LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
-
- setupQueueConfigs(cs.getClusterResource(), configuration);
-
}
@SuppressWarnings("checkstyle:nowhitespaceafter")
- protected void setupQueueConfigs(Resource clusterResource,
- CapacitySchedulerConfiguration conf) throws
- IOException {
+ protected void setupQueueConfigs(
+ Resource clusterResource, CapacitySchedulerConfiguration conf) throws IOException {
writeLock.lock();
try {
CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
@@ -183,8 +171,7 @@ public class LeafQueue extends AbstractCSQueue {
this.lastClusterResource = clusterResource;
- this.cachedResourceLimitsForHeadroom = new ResourceLimits(
- clusterResource);
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource);
// Initialize headroom info, also used for calculating application
// master resource limits. Since this happens during queue initialization
@@ -193,15 +180,13 @@ public class LeafQueue extends AbstractCSQueue {
// absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
setQueueResourceLimitsInfo(clusterResource);
- setOrderingPolicy(
- conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
+ setOrderingPolicy(conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxAMResourcePerQueuePercent =
- conf.getMaximumApplicationMasterResourcePerQueuePercent(
- getQueuePath());
+ conf.getMaximumApplicationMasterResourcePerQueuePercent(getQueuePath());
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
@@ -212,33 +197,27 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- priorityAcls = conf.getPriorityAcls(getQueuePath(),
- csContext.getMaxClusterLevelAppPriority());
+ priorityAcls =
+ conf.getPriorityAcls(getQueuePath(), csContext.getMaxClusterLevelAppPriority());
Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
- throw new IOException(
- "Invalid default label expression of " + " queue=" + getQueuePath()
- + " doesn't have permission to access all labels "
- + "in default label expression. labelExpression of resource request="
- + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
- getAccessibleNodeLabels() == null ?
- "" :
- StringUtils
- .join(getAccessibleNodeLabels().iterator(), ',')));
+ throw new IOException("Invalid default label expression of " + " queue=" + getQueuePath()
+ + " doesn't have permission to access all labels "
+ + "in default label expression. labelExpression of resource request="
+ + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
+ getAccessibleNodeLabels() == null ? "" :
+ StringUtils.join(getAccessibleNodeLabels().iterator(), ',')));
}
nodeLocalityDelay = schedConf.getNodeLocalityDelay();
- rackLocalityAdditionalDelay = schedConf
- .getRackLocalityAdditionalDelay();
- rackLocalityFullReset = schedConf
- .getRackLocalityFullReset();
+ rackLocalityAdditionalDelay = schedConf.getRackLocalityAdditionalDelay();
+ rackLocalityFullReset = schedConf.getRackLocalityFullReset();
// re-init this since max allocation could have changed
this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
- Resources.subtract(
- queueAllocationSettings.getMaximumAllocation(),
+ Resources.subtract(queueAllocationSettings.getMaximumAllocation(),
queueAllocationSettings.getMinimumAllocation()),
queueAllocationSettings.getMaximumAllocation());
@@ -254,8 +233,8 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- defaultAppPriorityPerQueue = Priority.newInstance(
- conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
+ defaultAppPriorityPerQueue =
+ Priority.newInstance(conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights.
float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
@@ -331,7 +310,7 @@ public class LeafQueue extends AbstractCSQueue {
public float getMinimumAllocationFactor() {
return minimumAllocationFactor;
}
-
+
/**
* Used only by tests.
*/
@@ -365,7 +344,7 @@ public class LeafQueue extends AbstractCSQueue {
public List<CSQueue> getChildQueues() {
return null;
}
-
+
/**
* Set user limit.
* @param userLimit new user limit
@@ -390,8 +369,7 @@ public class LeafQueue extends AbstractCSQueue {
public int getNumApplications() {
readLock.lock();
try {
- return getNumPendingApplications() + getNumActiveApplications() +
- getNumNonRunnableApps();
+ return getNumPendingApplications() + getNumActiveApplications() + getNumNonRunnableApps();
} finally {
readLock.unlock();
}
@@ -549,14 +527,13 @@ public class LeafQueue extends AbstractCSQueue {
}
// Sanity check
- if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
- .getQueuePath().equals(getQueuePath())) {
- throw new IOException(
- "Trying to reinitialize " + getQueuePath() + " from "
- + newlyParsedQueue.getQueuePath());
+ if (!(newlyParsedQueue instanceof AbstractLeafQueue) || !newlyParsedQueue.getQueuePath()
+ .equals(getQueuePath())) {
+ throw new IOException("Trying to reinitialize " + getQueuePath() + " from "
+ + newlyParsedQueue.getQueuePath());
}
- LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
+ AbstractLeafQueue newlyParsedLeafQueue = (AbstractLeafQueue) newlyParsedQueue;
// don't allow the maximum allocation to be decreased in size
// since we have already told running AM's the size
@@ -677,12 +654,12 @@ public class LeafQueue extends AbstractCSQueue {
try {
parent.validateSubmitApplication(applicationId, userName, queue);
} catch (AccessControlException ace) {
- LOG.info("Failed to submit application to parent-queue: " +
+ LOG.info("Failed to submit application to parent-queue: " +
parent.getQueuePath(), ace);
throw ace;
}
}
-
+
public Resource getAMResourceLimit() {
return usageTracker.getQueueUsage().getAMLimit();
}
@@ -775,8 +752,7 @@ public class LeafQueue extends AbstractCSQueue {
}
- public Resource calculateAndGetAMResourceLimitPerPartition(
- String nodePartition) {
+ public Resource calculateAndGetAMResourceLimitPerPartition(String nodePartition) {
writeLock.lock();
try {
/*
@@ -793,13 +769,12 @@ public class LeafQueue extends AbstractCSQueue {
// For non-labeled partition, we need to consider the current queue
// usage limit.
if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
- synchronized (queueResourceLimitsInfo){
+ synchronized (queueResourceLimitsInfo) {
queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
}
}
- float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
- nodePartition);
+ float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(nodePartition);
// Current usable resource for this queue and partition is the max of
// queueCurrentLimit and queuePartitionResource.
@@ -807,13 +782,13 @@ public class LeafQueue extends AbstractCSQueue {
// guarantee, use the guarantee as the queuePartitionUsableResource
// because nothing less than the queue's guarantee should be used when
// calculating the AM limit.
- Resource queuePartitionUsableResource = (Resources.fitsIn(
- resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
+ Resource queuePartitionUsableResource =
+ (Resources.fitsIn(resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
queueCurrentLimit : queuePartitionResource;
- Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionUsableResource, amResourcePercent,
- queueAllocationSettings.getMinimumAllocation());
+ Resource amResouceLimit =
+ Resources.multiplyAndNormalizeUp(resourceCalculator, queuePartitionUsableResource,
+ amResourcePercent, queueAllocationSettings.getMinimumAllocation());
usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
@@ -832,8 +807,7 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
try {
// limit of allowed resource usage for application masters
- Map<String, Resource> userAmPartitionLimit =
- new HashMap<String, Resource>();
+ Map<String, Resource> userAmPartitionLimit = new HashMap<String, Resource>();
// AM Resource Limit for accessible labels can be pre-calculated.
// This will help in updating AMResourceLimit for all labels when queue
@@ -842,10 +816,8 @@ public class LeafQueue extends AbstractCSQueue {
calculateAndGetAMResourceLimitPerPartition(nodePartition);
}
- for (Iterator<FiCaSchedulerApp> fsApp =
- getPendingAppsOrderingPolicy()
- .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
- fsApp.hasNext(); ) {
+ for (Iterator<FiCaSchedulerApp> fsApp = getPendingAppsOrderingPolicy().getAssignmentIterator(
+ IteratorSelector.EMPTY_ITERATOR_SELECTOR); fsApp.hasNext(); ) {
FiCaSchedulerApp application = fsApp.next();
ApplicationId applicationId = application.getApplicationId();
@@ -859,8 +831,7 @@ public class LeafQueue extends AbstractCSQueue {
amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
}
// Check am resource limit.
- Resource amIfStarted = Resources.add(
- application.getAMResource(partitionName),
+ Resource amIfStarted = Resources.add(application.getAMResource(partitionName),
usageTracker.getQueueUsage().getAMUsed(partitionName));
if (LOG.isDebugEnabled()) {
@@ -873,18 +844,19 @@ public class LeafQueue extends AbstractCSQueue {
}
if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
- if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
- resourceCalculator, lastClusterResource,
- usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
+ if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(resourceCalculator,
+ lastClusterResource, usageTracker.getQueueUsage().getAMUsed(partitionName),
+ Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue, it is likely set too low."
- + " skipping enforcement to allow at least one application"
- + " to start");
- } else{
- application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+ + " skipping enforcement to allow at least one application" + " to start");
+ } else {
+ application.updateAMContainerDiagnostics(
+ SchedulerApplicationAttempt.AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
- LOG.debug("Not activating application {} as amIfStarted: {}"
- + " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
+ LOG.debug(
+ "Not activating application {} as amIfStarted: {}" + " exceeds amLimit: {}",
+ applicationId, amIfStarted, amLimit);
continue;
}
}
@@ -895,25 +867,23 @@ public class LeafQueue extends AbstractCSQueue {
// Verify whether we already calculated user-am-limit for this label.
if (userAMLimit == null) {
- userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
- application.getUser());
+ userAMLimit = getUserAMResourceLimitPerPartition(partitionName, application.getUser());
userAmPartitionLimit.put(partitionName, userAMLimit);
}
- Resource userAmIfStarted = Resources.add(
- application.getAMResource(partitionName),
+ Resource userAmIfStarted = Resources.add(application.getAMResource(partitionName),
user.getConsumedAMResources(partitionName));
if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
- if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
- resourceCalculator, lastClusterResource,
- usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
+ if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(resourceCalculator,
+ lastClusterResource, usageTracker.getQueueUsage().getAMUsed(partitionName),
+ Resources.none()))) {
LOG.warn("maximum-am-resource-percent is insufficient to start a"
+ " single application in queue for user, it is likely set too"
- + " low. skipping enforcement to allow at least one application"
- + " to start");
- } else{
- application.updateAMContainerDiagnostics(AMState.INACTIVATED,
+ + " low. skipping enforcement to allow at least one application" + " to start");
+ } else {
+ application.updateAMContainerDiagnostics(
+ AMState.INACTIVATED,
CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
LOG.debug("Not activating application {} for user: {} as"
+ " userAmIfStarted: {} exceeds userAmLimit: {}",
@@ -923,17 +893,17 @@ public class LeafQueue extends AbstractCSQueue {
}
user.activateApplication();
orderingPolicy.addSchedulableEntity(application);
- application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
+ application.updateAMContainerDiagnostics(AMState.ACTIVATED,
+ null);
- usageTracker.getQueueUsage().incAMUsed(partitionName,
- application.getAMResource(partitionName));
- user.getResourceUsage().incAMUsed(partitionName,
- application.getAMResource(partitionName));
+ usageTracker.getQueueUsage()
+ .incAMUsed(partitionName, application.getAMResource(partitionName));
+ user.getResourceUsage().incAMUsed(partitionName, application.getAMResource(partitionName));
user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
- usageTracker.getMetrics().setAMResouceLimitForUser(partitionName,
- application.getUser(), userAMLimit);
+ usageTracker.getMetrics()
+ .setAMResouceLimitForUser(partitionName, application.getUser(), userAMLimit);
fsApp.remove();
LOG.info("Application " + applicationId + " from user: " + application
.getUser() + " activated in queue: " + getQueuePath());
@@ -943,12 +913,10 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- private void addApplicationAttempt(FiCaSchedulerApp application,
- User user) {
+ private void addApplicationAttempt(FiCaSchedulerApp application, User user) {
writeLock.lock();
try {
- applicationAttemptMap.put(application.getApplicationAttemptId(),
- application);
+ applicationAttemptMap.put(application.getApplicationAttemptId(), application);
if (application.isRunnable()) {
runnableApps.add(application);
@@ -966,8 +934,8 @@ public class LeafQueue extends AbstractCSQueue {
getPendingAppsOrderingPolicy().addSchedulableEntity(application);
// Activate applications
- if (Resources.greaterThan(resourceCalculator, lastClusterResource,
- lastClusterResource, Resources.none())) {
+ if (Resources.greaterThan(resourceCalculator, lastClusterResource, lastClusterResource,
+ Resources.none())) {
activateApplications();
} else {
application.updateAMContainerDiagnostics(AMState.INACTIVATED,
@@ -1010,8 +978,7 @@ public class LeafQueue extends AbstractCSQueue {
parent.finishApplicationAttempt(application, queue);
}
- private void removeApplicationAttempt(
- FiCaSchedulerApp application, String userName) {
+ private void removeApplicationAttempt(FiCaSchedulerApp application, String userName) {
writeLock.lock();
try {
@@ -1032,11 +999,10 @@ public class LeafQueue extends AbstractCSQueue {
boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
if (!wasActive) {
pendingOrderingPolicy.removeSchedulableEntity(application);
- } else{
- usageTracker.getQueueUsage().decAMUsed(partitionName,
- application.getAMResource(partitionName));
- user.getResourceUsage().decAMUsed(partitionName,
- application.getAMResource(partitionName));
+ } else {
+ usageTracker.getQueueUsage()
+ .decAMUsed(partitionName, application.getAMResource(partitionName));
+ user.getResourceUsage().decAMUsed(partitionName, application.getAMResource(partitionName));
usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(),
application.getAMResource(partitionName));
}
@@ -1062,8 +1028,7 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- private FiCaSchedulerApp getApplication(
- ApplicationAttemptId applicationAttemptId) {
+ private FiCaSchedulerApp getApplication(ApplicationAttemptId applicationAttemptId) {
return applicationAttemptMap.get(applicationAttemptId);
}
@@ -1084,8 +1049,8 @@ public class LeafQueue extends AbstractCSQueue {
}
private CSAssignment allocateFromReservedContainer(Resource clusterResource,
- CandidateNodeSet<FiCaSchedulerNode> candidates,
- ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
+ CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits currentResourceLimits,
+ SchedulingMode schedulingMode) {
// Irrespective of Single / Multi Node Placement, the allocate from
// Reserved Container has to happen only for the single node which
@@ -1097,15 +1062,14 @@ public class LeafQueue extends AbstractCSQueue {
if (node != null) {
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- FiCaSchedulerApp application = getApplication(
- reservedContainer.getApplicationAttemptId());
+ FiCaSchedulerApp application = getApplication(reservedContainer.getApplicationAttemptId());
if (null != application) {
- ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node, SystemClock.getInstance().getTime(), application);
- CSAssignment assignment = application.assignContainers(
- clusterResource, candidates, currentResourceLimits,
- schedulingMode, reservedContainer);
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
+ SystemClock.getInstance().getTime(), application);
+ CSAssignment assignment =
+ application.assignContainers(clusterResource, candidates, currentResourceLimits,
+ schedulingMode, reservedContainer);
return assignment;
}
}
@@ -1114,8 +1078,7 @@ public class LeafQueue extends AbstractCSQueue {
return null;
}
- private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
- String partition,
+ private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(String partition,
SchedulingMode schedulingMode) {
synchronized (userLimitsCache) {
long latestVersion = usersManager.getLatestVersionOfUsersState();
@@ -1125,20 +1088,19 @@ public class LeafQueue extends AbstractCSQueue {
this.currentUserLimitCacheVersion = latestVersion;
userLimitsCache.clear();
- Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
- uLCByPartition = new HashMap<>();
+ Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> uLCByPartition =
+ new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
- ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
- new ConcurrentHashMap<>();
+ ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode = new ConcurrentHashMap<>();
uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
return uLCBySchedulingMode;
}
// User limits cache does not need invalidating
- Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
- uLCByPartition = userLimitsCache.get(partition);
+ Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>> uLCByPartition =
+ userLimitsCache.get(partition);
if (uLCByPartition == null) {
uLCByPartition = new HashMap<>();
userLimitsCache.put(partition, uLCByPartition);
@@ -1157,8 +1119,8 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public CSAssignment assignContainers(Resource clusterResource,
- CandidateNodeSet<FiCaSchedulerNode> candidates,
- ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
+ CandidateNodeSet<FiCaSchedulerNode> candidates, ResourceLimits currentResourceLimits,
+ SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
@@ -1170,8 +1132,9 @@ public class LeafQueue extends AbstractCSQueue {
setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
// Check for reserved resources, try to allocate reserved container first.
- CSAssignment assignment = allocateFromReservedContainer(clusterResource,
- candidates, currentResourceLimits, schedulingMode);
+ CSAssignment assignment =
+ allocateFromReservedContainer(clusterResource, candidates, currentResourceLimits,
+ schedulingMode);
if (null != assignment) {
return assignment;
}
@@ -1206,33 +1169,29 @@ public class LeafQueue extends AbstractCSQueue {
boolean needAssignToQueueCheck = true;
IteratorSelector sel = new IteratorSelector();
sel.setPartition(candidates.getPartition());
- for (Iterator<FiCaSchedulerApp> assignmentIterator =
- orderingPolicy.getAssignmentIterator(sel);
+ for (Iterator<FiCaSchedulerApp> assignmentIterator = orderingPolicy.getAssignmentIterator(sel);
assignmentIterator.hasNext(); ) {
FiCaSchedulerApp application = assignmentIterator.next();
- ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node, SystemClock.getInstance().getTime(), application);
+ ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager, node,
+ SystemClock.getInstance().getTime(), application);
// Check queue max-capacity limit
Resource appReserved = application.getCurrentReservation();
if (needAssignToQueueCheck) {
- if (!super.canAssignToThisQueue(clusterResource,
- candidates.getPartition(), currentResourceLimits, appReserved,
- schedulingMode)) {
- ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
- activitiesManager, node, application, application.getPriority(),
+ if (!super.canAssignToThisQueue(clusterResource, candidates.getPartition(),
+ currentResourceLimits, appReserved, schedulingMode)) {
+ ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(activitiesManager, node,
+ application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(),
- ActivityState.REJECTED,
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+ getQueuePath(), ActivityState.REJECTED,
ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
return CSAssignment.NULL_ASSIGNMENT;
}
// If there was no reservation and canAssignToThisQueue returned
// true, there is no reason to check further.
- if (!this.reservationsContinueLooking
- || appReserved.equals(Resources.none())) {
+ if (!this.reservationsContinueLooking || appReserved.equals(Resources.none())) {
needAssignToQueueCheck = false;
}
}
@@ -1242,13 +1201,12 @@ public class LeafQueue extends AbstractCSQueue {
if (cul != null) {
cachedUserLimit = cul.userLimit;
}
- Resource userLimit = computeUserLimitAndSetHeadroom(application,
- clusterResource, candidates.getPartition(), schedulingMode,
- cachedUserLimit);
+ Resource userLimit =
+ computeUserLimitAndSetHeadroom(application, clusterResource, candidates.getPartition(),
+ schedulingMode, cachedUserLimit);
if (cul == null) {
cul = new CachedUserLimit(userLimit);
- CachedUserLimit retVal =
- userLimits.putIfAbsent(application.getUser(), cul);
+ CachedUserLimit retVal = userLimits.putIfAbsent(application.getUser(), cul);
if (retVal != null) {
// another thread updated the user limit cache before us
cul = retVal;
@@ -1260,9 +1218,9 @@ public class LeafQueue extends AbstractCSQueue {
if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
userAssignable = false;
} else {
- userAssignable = canAssignToUser(clusterResource, application.getUser(),
- userLimit, application, candidates.getPartition(),
- currentResourceLimits);
+ userAssignable =
+ canAssignToUser(clusterResource, application.getUser(), userLimit, application,
+ candidates.getPartition(), currentResourceLimits);
if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
cul.canAssign = false;
cul.reservation = appReserved;
@@ -1271,59 +1229,54 @@ public class LeafQueue extends AbstractCSQueue {
if (!userAssignable) {
application.updateAMContainerDiagnostics(AMState.ACTIVATED,
"User capacity has reached its maximum limit.");
- ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
- activitiesManager, node, application, application.getPriority(),
+ ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(activitiesManager, node,
+ application, application.getPriority(),
ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
continue;
}
// Try to schedule
- assignment = application.assignContainers(clusterResource,
- candidates, currentResourceLimits, schedulingMode, null);
+ assignment = application.assignContainers(clusterResource, candidates, currentResourceLimits,
+ schedulingMode, null);
if (LOG.isDebugEnabled()) {
- LOG.debug("post-assignContainers for application " + application
- .getApplicationId());
+ LOG.debug(
+ "post-assignContainers for application " + application.getApplicationId());
application.showRequests();
}
// Did we schedule or reserve a container?
Resource assigned = assignment.getResource();
- if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
- Resources.none())) {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(),
- ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
+ if (Resources.greaterThan(resourceCalculator, clusterResource, assigned, Resources.none())) {
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+ getQueuePath(), ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
return assignment;
- } else if (assignment.getSkippedType()
- == CSAssignment.SkippedType.OTHER) {
- ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
- activitiesManager, application.getApplicationId(),
- ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+ } else if (assignment.getSkippedType() == CSAssignment.SkippedType.OTHER) {
+ ActivitiesLogger.APP.finishSkippedAppAllocationRecording(activitiesManager,
+ application.getApplicationId(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY);
application.updateNodeInfoForAMDiagnostics(node);
- } else if (assignment.getSkippedType()
- == CSAssignment.SkippedType.QUEUE_LIMIT) {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
- () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
- + " from " + application.getApplicationId());
+ } else if (assignment.getSkippedType() == CSAssignment.SkippedType.QUEUE_LIMIT) {
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+ getQueuePath(), ActivityState.REJECTED,
+ () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM + " from "
+ + application.getApplicationId());
return assignment;
- } else{
+ } else {
// If we don't allocate anything, and it is not skipped by application,
// we will return to respect FIFO of applications
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+ getQueuePath(), ActivityState.SKIPPED,
ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
- ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
- activitiesManager, application.getApplicationId(),
- ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.APP.finishSkippedAppAllocationRecording(activitiesManager,
+ application.getApplicationId(), ActivityState.SKIPPED,
+ ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
}
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
- ActivityDiagnosticConstant.EMPTY);
+ ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node, parent.getQueuePath(),
+ getQueuePath(), ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
return CSAssignment.NULL_ASSIGNMENT;
}
@@ -1340,14 +1293,13 @@ public class LeafQueue extends AbstractCSQueue {
if (allocation.getAllocateFromReservedContainer() == null) {
readLock.lock();
try {
- FiCaSchedulerApp app =
- schedulerContainer.getSchedulerApplicationAttempt();
+ FiCaSchedulerApp app = schedulerContainer.getSchedulerApplicationAttempt();
String username = app.getUser();
String p = schedulerContainer.getNodePartition();
// check user-limit
- Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
- allocation.getSchedulingMode(), null);
+ Resource userLimit =
+ computeUserLimitAndSetHeadroom(app, cluster, p, allocation.getSchedulingMode(), null);
// Deduct resources that we can release
User user = getUser(username);
@@ -1356,13 +1308,11 @@ public class LeafQueue extends AbstractCSQueue {
return false;
}
Resource usedResource = Resources.clone(user.getUsed(p));
- Resources.subtractFrom(usedResource,
- request.getTotalReleasedResource());
+ Resources.subtractFrom(usedResource, request.getTotalReleasedResource());
- if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
- userLimit)) {
- LOG.debug("Used resource={} exceeded user-limit={}",
- usedResource, userLimit);
+ if (Resources.greaterThan(resourceCalculator, cluster, usedResource, userLimit)) {
+ LOG.debug("Used resource={} exceeded user-limit={}", usedResource,
+ userLimit);
return false;
}
} finally {
@@ -1377,7 +1327,7 @@ public class LeafQueue extends AbstractCSQueue {
SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
RMContainer rmContainer = schedulerContainer.getRmContainer();
- LeafQueue targetLeafQueue =
+ AbstractLeafQueue targetLeafQueue =
schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
if (targetLeafQueue == this) {
@@ -1385,42 +1335,34 @@ public class LeafQueue extends AbstractCSQueue {
if (rmContainer.getState() == RMContainerState.RESERVED) {
// For other reserved containers
// This is a reservation exchange, complete previous reserved container
- completedContainer(clusterResource,
- schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
- .createAbnormalContainerStatus(rmContainer.getContainerId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, false);
+ completedContainer(clusterResource, schedulerContainer.getSchedulerApplicationAttempt(),
+ schedulerContainer.getSchedulerNode(), rmContainer,
+ SchedulerUtils.createAbnormalContainerStatus(rmContainer.getContainerId(),
+ SchedulerUtils.UNRESERVED_CONTAINER), RMContainerEventType.RELEASED, null, false);
}
- } else{
+ } else {
// When trying to preempt containers from different queue -- this
// is for lazy preemption feature (kill preemption candidate in scheduling
// cycle).
targetLeafQueue.completedContainer(clusterResource,
schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getSchedulerNode(),
- schedulerContainer.getRmContainer(), SchedulerUtils
- .createPreemptedContainerStatus(rmContainer.getContainerId(),
- SchedulerUtils.PREEMPTED_CONTAINER),
- RMContainerEventType.KILL, null, false);
+ schedulerContainer.getSchedulerNode(), schedulerContainer.getRmContainer(),
+ SchedulerUtils.createPreemptedContainerStatus(rmContainer.getContainerId(),
+ SchedulerUtils.PREEMPTED_CONTAINER), RMContainerEventType.KILL, null, false);
}
}
private void releaseContainers(Resource clusterResource,
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
- for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
- .getContainersToRelease()) {
+ for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request.getContainersToRelease()) {
internalReleaseContainer(clusterResource, c);
}
// Handle container reservation looking, or lazy preemption case:
- if (null != request.getContainersToAllocate() && !request
- .getContainersToAllocate().isEmpty()) {
- for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
- .getContainersToAllocate()) {
+ if (null != request.getContainersToAllocate() && !request.getContainersToAllocate().isEmpty()) {
+ for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request.getContainersToAllocate()) {
if (null != context.getToRelease()) {
- for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
- .getToRelease()) {
+ for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context.getToRelease()) {
internalReleaseContainer(clusterResource, c);
}
}
@@ -1438,10 +1380,10 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.lock();
try {
if (request.anythingAllocatedOrReserved()) {
- ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
- allocation = request.getFirstAllocatedOrReservedContainer();
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
- schedulerContainer = allocation.getAllocatedOrReservedContainer();
+ ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
+ request.getFirstAllocatedOrReservedContainer();
+ SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
+ allocation.getAllocatedOrReservedContainer();
// Do not modify queue when allocation from reserved container
if (allocation.getAllocateFromReservedContainer() == null) {
@@ -1450,19 +1392,16 @@ public class LeafQueue extends AbstractCSQueue {
applyToParentQueue = true;
// Book-keeping
// Note: Update headroom to account for current allocation too...
- allocateResource(cluster,
- schedulerContainer.getSchedulerApplicationAttempt(),
- allocation.getAllocatedOrReservedResource(),
- schedulerContainer.getNodePartition(),
+ allocateResource(cluster, schedulerContainer.getSchedulerApplicationAttempt(),
+ allocation.getAllocatedOrReservedResource(), schedulerContainer.getNodePartition(),
schedulerContainer.getRmContainer());
- orderingPolicy.containerAllocated(
- schedulerContainer.getSchedulerApplicationAttempt(),
+ orderingPolicy.containerAllocated(schedulerContainer.getSchedulerApplicationAttempt(),
schedulerContainer.getRmContainer());
}
// Update reserved resource
- if (Resources.greaterThan(resourceCalculator, cluster,
- request.getTotalReservedResource(), Resources.none())) {
+ if (Resources.greaterThan(resourceCalculator, cluster, request.getTotalReservedResource(),
+ Resources.none())) {
incReservedResource(schedulerContainer.getNodePartition(),
request.getTotalReservedResource());
}
@@ -1476,7 +1415,6 @@ public class LeafQueue extends AbstractCSQueue {
}
}
-
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource, application,
@@ -1487,29 +1425,28 @@ public class LeafQueue extends AbstractCSQueue {
Resource clusterResource, FiCaSchedulerApp application,
String partition) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
- getResourceLimitForActiveUsers(application.getUser(), clusterResource,
- partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- partition);
+ getResourceLimitForActiveUsers(application.getUser(), clusterResource, partition,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), partition);
}
private Resource getHeadroom(User user,
Resource currentPartitionResourceLimit, Resource clusterResource,
Resource userLimitResource, String partition) {
- /**
+ /**
* Headroom is:
* min(
* min(userLimit, queueMaxCap) - userConsumed,
* queueMaxCap - queueUsedResources
* )
- *
- * ( which can be expressed as,
- * min (userLimit - userConsumed, queuMaxCap - userConsumed,
+ *
+ * ( which can be expressed as,
+ * min (userLimit - userConsumed, queuMaxCap - userConsumed,
* queueMaxCap - queueUsedResources)
* )
*
* given that queueUsedResources >= userConsumed, this simplifies to
*
- * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
+ * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
* sum of queue max capacities of multiple queue's will be greater than the
* actual capacity of a given partition, hence we need to ensure that the
@@ -1518,45 +1455,39 @@ public class LeafQueue extends AbstractCSQueue {
* headroom = min (unused resourcelimit of a label, calculated headroom )
*/
currentPartitionResourceLimit =
- partition.equals(RMNodeLabelsManager.NO_LABEL)
- ? currentPartitionResourceLimit
- : getQueueMaxResource(partition);
+ partition.equals(RMNodeLabelsManager.NO_LABEL) ? currentPartitionResourceLimit :
+ getQueueMaxResource(partition);
Resource headroom = Resources.componentwiseMin(
- Resources.subtractNonNegative(userLimitResource,
- user.getUsed(partition)),
+ Resources.subtractNonNegative(userLimitResource, user.getUsed(partition)),
Resources.subtractNonNegative(currentPartitionResourceLimit,
usageTracker.getQueueUsage().getUsed(partition)));
// Normalize it before return
- headroom =
- Resources.roundDown(resourceCalculator, headroom,
- queueAllocationSettings.getMinimumAllocation());
+ headroom = Resources.roundDown(resourceCalculator, headroom,
+ queueAllocationSettings.getMinimumAllocation());
//headroom = min (unused resourcelimit of a label, calculated headroom )
- Resource clusterPartitionResource =
- labelManager.getResourceByLabel(partition, clusterResource);
- Resource clusterFreePartitionResource =
- Resources.subtract(clusterPartitionResource,
- csContext.getClusterResourceUsage().getUsed(partition));
- headroom = Resources.min(resourceCalculator, clusterPartitionResource,
- clusterFreePartitionResource, headroom);
+ Resource clusterPartitionResource = labelManager.getResourceByLabel(partition, clusterResource);
+ Resource clusterFreePartitionResource = Resources.subtract(clusterPartitionResource,
+ csContext.getClusterResourceUsage().getUsed(partition));
+ headroom =
+ Resources.min(resourceCalculator, clusterPartitionResource, clusterFreePartitionResource,
+ headroom);
return headroom;
}
-
- private void setQueueResourceLimitsInfo(
- Resource clusterResource) {
+
+ private void setQueueResourceLimitsInfo(Resource clusterResource) {
synchronized (queueResourceLimitsInfo) {
- queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
- .getLimit());
+ queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom.getLimit());
queueResourceLimitsInfo.setClusterResource(clusterResource);
}
}
// It doesn't necessarily to hold application's lock here.
- @Lock({LeafQueue.class})
+ @Lock({AbstractLeafQueue.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode, Resource userLimit) {
+ Resource clusterResource, String nodePartition, SchedulingMode schedulingMode,
+ Resource userLimit) {
String user = application.getUser();
User queueUser = getUser(user);
if (queueUser == null) {
@@ -1567,34 +1498,33 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
if (userLimit == null) {
- userLimit = getResourceLimitForActiveUsers(application.getUser(),
- clusterResource, nodePartition, schedulingMode);
+ userLimit =
+ getResourceLimitForActiveUsers(application.getUser(), clusterResource, nodePartition,
+ schedulingMode);
}
setQueueResourceLimitsInfo(clusterResource);
- Resource headroom =
- usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
- getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
- clusterResource, userLimit, nodePartition);
-
+ Resource headroom = usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
+ getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+ userLimit, nodePartition);
+
if (LOG.isDebugEnabled()) {
- LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
- + userLimit + " queueMaxAvailRes="
- + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
- + queueUser.getUsed() + " partition="
- + nodePartition);
- }
-
- CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
- queueUser, this, application, queueResourceLimitsInfo);
-
+ LOG.debug(
+ "Headroom calculation for user " + user + ": " + " userLimit=" + userLimit
+ + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
+ + queueUser.getUsed() + " partition=" + nodePartition);
+ }
+
+ CapacityHeadroomProvider headroomProvider =
+ new CapacityHeadroomProvider(queueUser, this, application, queueResourceLimitsInfo);
+
application.setHeadroomProvider(headroomProvider);
usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom);
-
+
return userLimit;
}
-
+
@Lock(NoLock.class)
public int getNodeLocalityDelay() {
return nodeLocalityDelay;
@@ -1623,11 +1553,10 @@ public class LeafQueue extends AbstractCSQueue {
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
- public Resource getResourceLimitForActiveUsers(String userName,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode) {
- return usersManager.getComputedResourceLimitForActiveUsers(userName,
- clusterResource, nodePartition, schedulingMode);
+ public Resource getResourceLimitForActiveUsers(String userName, Resource clusterResource,
+ String nodePartition, SchedulingMode schedulingMode) {
+ return usersManager.getComputedResourceLimitForActiveUsers(userName, clusterResource,
+ nodePartition, schedulingMode);
}
/**
@@ -1643,11 +1572,10 @@ public class LeafQueue extends AbstractCSQueue {
* RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
* @return Computed User Limit
*/
- public Resource getResourceLimitForAllUsers(String userName,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode) {
- return usersManager.getComputedResourceLimitForAllUsers(userName,
- clusterResource, nodePartition, schedulingMode);
+ public Resource getResourceLimitForAllUsers(String userName, Resource clusterResource,
+ String nodePartition, SchedulingMode schedulingMode) {
+ return usersManager.getComputedResourceLimitForAllUsers(userName, clusterResource,
+ nodePartition, schedulingMode);
}
@Private
@@ -1667,27 +1595,24 @@ public class LeafQueue extends AbstractCSQueue {
// Note: We aren't considering the current request since there is a fixed
// overhead of the AM, but it's a > check, not a >= check, so...
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- user.getUsed(nodePartition), limit)) {
+ if (Resources.greaterThan(resourceCalculator, clusterResource, user.getUsed(nodePartition),
+ limit)) {
// if enabled, check to see if could we potentially use this node instead
// of a reserved node if the application has reserved containers
if (this.reservationsContinueLooking) {
if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
- Resources.subtract(user.getUsed(),
- application.getCurrentReservation()), limit)) {
+ Resources.subtract(user.getUsed(), application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueuePath()
- + " will exceed limit based on reservations - "
- + " consumed: " + user.getUsed() + " reserved: " + application
- .getCurrentReservation() + " limit: " + limit);
+ + " will exceed limit based on reservations - " + " consumed: " + user.getUsed()
+ + " reserved: " + application.getCurrentReservation() + " limit: " + limit);
}
- Resource amountNeededToUnreserve = Resources.subtract(
- user.getUsed(nodePartition), limit);
+ Resource amountNeededToUnreserve =
+ Resources.subtract(user.getUsed(nodePartition), limit);
// we can only acquire a new container if we unreserve first to
// respect user-limit
- currentResourceLimits.setAmountNeededUnreserve(
- amountNeededToUnreserve);
+ currentResourceLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
return true;
}
}
@@ -1705,8 +1630,7 @@ public class LeafQueue extends AbstractCSQueue {
}
@Override
- protected void setDynamicQueueProperties(
- CapacitySchedulerConfiguration configuration) {
+ protected void setDynamicQueueProperties(CapacitySchedulerConfiguration configuration) {
// set to -1, to disable it
configuration.setUserLimitFactor(getQueuePath(), -1);
// Set Max AM percentage to a higher value
@@ -1715,8 +1639,8 @@ public class LeafQueue extends AbstractCSQueue {
super.setDynamicQueueProperties(configuration);
}
- private void updateSchedulerHealthForCompletedContainer(
- RMContainer rmContainer, ContainerStatus containerStatus) {
+ private void updateSchedulerHealthForCompletedContainer(RMContainer rmContainer,
+ ContainerStatus containerStatus) {
// Update SchedulerHealth for released / preempted container
SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
if (null == schedulerHealth) {
@@ -1730,8 +1654,7 @@ public class LeafQueue extends AbstractCSQueue {
schedulerHealth.updateSchedulerPreemptionCounts(1);
} else {
schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
- rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
- getQueuePath());
+ rmContainer.getAllocatedNode(), rmContainer.getContainerId(), getQueuePath());
}
}
@@ -1743,15 +1666,13 @@ public class LeafQueue extends AbstractCSQueue {
* @param nodePartition
* Partition
*/
- public void recalculateQueueUsageRatio(Resource clusterResource,
- String nodePartition) {
+ public void recalculateQueueUsageRatio(Resource clusterResource, String nodePartition) {
writeLock.lock();
try {
ResourceUsage queueResourceUsage = getQueueResourceUsage();
if (nodePartition == null) {
- for (String partition : Sets.union(
- getQueueCapacities().getNodePartitionsSet(),
+ for (String partition : Sets.union(getQueueCapacities().getNodePartitionsSet(),
queueResourceUsage.getNodePartitionsSet())) {
usersManager.updateUsageRatio(partition, clusterResource);
}
@@ -1764,10 +1685,9 @@ public class LeafQueue extends AbstractCSQueue {
}
@Override
- public void completedContainer(Resource clusterResource,
- FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
- ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
- boolean sortQueues) {
+ public void completedContainer(Resource clusterResource, FiCaSchedulerApp application,
+ FiCaSchedulerNode node, RMContainer rmContainer, ContainerStatus containerStatus,
+ RMContainerEventType event, CSQueue childQueue, boolean sortQueues) {
// Update SchedulerHealth for released / preempted container
updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
@@ -1784,11 +1704,10 @@ public class LeafQueue extends AbstractCSQueue {
// happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) {
- removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
- node, rmContainer);
- } else{
- removed = application.containerCompleted(rmContainer, containerStatus,
- event, node.getPartition());
+ removed = application.unreserve(rmContainer.getReservedSchedulerKey(), node, rmContainer);
+ } else {
+ removed = application.containerCompleted(rmContainer, containerStatus, event,
+ node.getPartition());
node.releaseContainer(rmContainer.getContainerId(), false);
}
@@ -1806,20 +1725,16 @@ public class LeafQueue extends AbstractCSQueue {
writeLock.unlock();
}
-
if (removed) {
// Inform the parent queue _outside_ of the leaf-queue lock
- parent.completedContainer(clusterResource, application, node,
- rmContainer, null, event, this, sortQueues);
+ parent.completedContainer(clusterResource, application, node, rmContainer, null, event,
+ this, sortQueues);
}
}
// Notify PreemptionManager
csContext.getPreemptionManager().removeKillableContainer(
- new KillableContainer(
- rmContainer,
- node.getPartition(),
- getQueuePath()));
+ new KillableContainer(rmContainer, node.getPartition(), getQueuePath()));
// Update preemption metrics if exit status is PREEMPTED
if (containerStatus != null
@@ -1828,23 +1743,20 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- void allocateResource(Resource clusterResource,
- SchedulerApplicationAttempt application, Resource resource,
- String nodePartition, RMContainer rmContainer) {
+ void allocateResource(Resource clusterResource, SchedulerApplicationAttempt application,
+ Resource resource, String nodePartition, RMContainer rmContainer) {
writeLock.lock();
try {
super.allocateResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
- if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+ if (null != rmContainer && rmContainer.getNodeLabelExpression()
+ .equals(RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
TreeSet<RMContainer> rmContainers = null;
- if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
- nodePartition))) {
+ if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(nodePartition))) {
rmContainers = new TreeSet<>();
- ignorePartitionExclusivityRMContainers.put(nodePartition,
- rmContainers);
+ ignorePartitionExclusivityRMContainers.put(nodePartition, rmContainers);
}
rmContainers.add(rmContainer);
}
@@ -1858,14 +1770,13 @@ public class LeafQueue extends AbstractCSQueue {
Resource partitionHeadroom = Resources.createResource(0, 0);
if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
- partitionHeadroom = getHeadroom(user,
- cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
- getResourceLimitForActiveUsers(userName, clusterResource,
- nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- nodePartition);
+ partitionHeadroom =
+ getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+ getResourceLimitForActiveUsers(userName, clusterResource, nodePartition,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition);
}
- usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
- partitionHeadroom);
+ usageTracker.getMetrics()
+ .setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(getQueuePath() + " user=" + userName + " used="
@@ -1878,20 +1789,18 @@ public class LeafQueue extends AbstractCSQueue {
}
}
- void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource, String nodePartition,
- RMContainer rmContainer) {
+ void releaseResource(Resource clusterResource, FiCaSchedulerApp application, Resource resource,
+ String nodePartition, RMContainer rmContainer) {
writeLock.lock();
try {
super.releaseResource(clusterResource, resource, nodePartition);
// handle ignore exclusivity container
- if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
+ if (null != rmContainer && rmContainer.getNodeLabelExpression()
+ .equals(RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
RMNodeLabelsManager.NO_LABEL)) {
if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
- Set<RMContainer> rmContainers =
- ignorePartitionExclusivityRMContainers.get(nodePartition);
+ Set<RMContainer> rmContainers = ignorePartitionExclusivityRMContainers.get(nodePartition);
rmContainers.remove(rmContainer);
if (rmContainers.isEmpty()) {
ignorePartitionExclusivityRMContainers.remove(nodePartition);
@@ -1906,40 +1815,38 @@ public class LeafQueue extends AbstractCSQueue {
Resource partitionHeadroom = Resources.createResource(0, 0);
if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
- partitionHeadroom = getHeadroom(user,
- cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
- getResourceLimitForActiveUsers(userName, clusterResource,
- nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- nodePartition);
+ partitionHeadroom =
+ getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
+ getResourceLimitForActiveUsers(userName, clusterResource, nodePartition,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition);
}
- usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
- partitionHeadroom);
+ usageTracker.getMetrics()
+ .setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom);
if (LOG.isDebugEnabled()) {
LOG.debug(
getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers="
- + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
- + user.getUsed());
+ + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
+ + user.getUsed());
}
} finally {
writeLock.unlock();
}
}
-
- private void updateCurrentResourceLimits(
- ResourceLimits currentResourceLimits, Resource clusterResource) {
+
+ private void updateCurrentResourceLimits(ResourceLimits currentResourceLimits,
+ Resource clusterResource) {
// TODO: need consider non-empty node labels when resource limits supports
// node labels
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
- this.cachedResourceLimitsForHeadroom =
- new ResourceLimits(currentResourceLimits.getLimit());
- Resource queueMaxResource = getEffectiveMaxCapacityDown(
- RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
- this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
- resourceCalculator, clusterResource, queueMaxResource,
- currentResourceLimits.getLimit()));
+ this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
+ Resource queueMaxResource = getEffectiveMaxCapacityDown(RMNodeLabelsManager.NO_LABEL,
+ queueAllocationSettings.getMinimumAllocation());
+ this.cachedResourceLimitsForHeadroom.setLimit(
+ Resources.min(resourceCalculator, clusterResource, queueMaxResource,
+ currentResourceLimits.getLimit()));
}
@Override
@@ -1967,12 +1874,12 @@ public class LeafQueue extends AbstractCSQueue {
recalculateQueueUsageRatio(clusterResource, null);
// Update metrics
- CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
- this, labelManager, null);
+ CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager,
+ null);
// Update configured capacity/max-capacity for default partition only
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
- labelManager.getResourceByLabel(null, clusterResource),
- RMNodeLabelsManager.NO_LABEL, this);
+ labelManager.getResourceByLabel(null, clusterResource), RMNodeLabelsManager.NO_LABEL,
+ this);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
@@ -1983,10 +1890,8 @@ public class LeafQueue extends AbstractCSQueue {
usersManager.userLimitNeedsRecompute();
// Update application properties
- for (FiCaSchedulerApp application : orderingPolicy
- .getSchedulableEntities()) {
- computeUserLimitAndSetHeadroom(application, clusterResource,
- RMNodeLabelsManager.NO_LABEL,
+ for (FiCaSchedulerApp application : orderingPolicy.getSchedulableEntities()) {
+ computeUserLimitAndSetHeadroom(application, clusterResource, RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
}
} finally {
@@ -1997,16 +1902,14 @@ public class LeafQueue extends AbstractCSQueue {
@Override
public void incUsedResource(String nodeLabel, Resource resourceToInc,
SchedulerApplicationAttempt application) {
- usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
- nodeLabel, true);
+ usersManager.updateUserResourceUsage(application.getUser(), resourceToInc, nodeLabel, true);
super.incUsedResource(nodeLabel, resourceToInc, application);
}
@Override
public void decUsedResource(String nodeLabel, Resource resourceToDec,
SchedulerApplicationAttempt application) {
- usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
- nodeLabel, false);
+ usersManager.updateUserResourceUsage(application.getUser(), resourceToDec, nodeLabel, false);
super.decUsedResource(nodeLabel, resourceToDec, application);
}
@@ -2017,8 +1920,7 @@ public class LeafQueue extends AbstractCSQueue {
return;
}
- user.getResourceUsage().incAMUsed(nodeLabel,
- resourceToInc);
+ user.getResourceUsage().incAMUsed(nodeLabel, resourceToInc);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
}
@@ -2030,8 +1932,7 @@ public class LeafQueue extends AbstractCSQueue {
return;
}
- user.getResourceUsage().decAMUsed(nodeLabel,
- resourceToDec);
+ user.getResourceUsage().decAMUsed(nodeLabel, resourceToDec);
// ResourceUsage has its own lock, no addition lock needs here.
usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
}
@@ -2048,11 +1949,9 @@ public class LeafQueue extends AbstractCSQueue {
// Careful! Locking order is important!
writeLock.lock();
try {
- FiCaSchedulerNode node = csContext.getNode(
- rmContainer.getContainer().getNodeId());
- allocateResource(clusterResource, attempt,
- rmContainer.getContainer().getResource(), node.getPartition(),
- rmContainer);
+ FiCaSchedulerNode node = csContext.getNode(rmContainer.getContainer().getNodeId());
+ allocateResource(clusterResource, attempt, rmContainer.getContainer().getResource(),
+ node.getPartition(), rmContainer);
} finally {
writeLock.unlock();
}
@@ -2064,24 +1963,22 @@ public class LeafQueue extends AbstractCSQueue {
* Obtain (read-only) collection of pending applications.
*/
public Collection<FiCaSchedulerApp> getPendingApplications() {
- return Collections.unmodifiableCollection(pendingOrderingPolicy
- .getSchedulableEntities());
+ return Collections.unmodifiableCollection(pendingOrderingPolicy.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of active applications.
*/
public Collection<FiCaSchedulerApp> getApplications() {
- return Collections.unmodifiableCollection(orderingPolicy
- .getSchedulableEntities());
+ return Collections.unmodifiableCollection(orderingPolicy.getSchedulableEntities());
}
/**
* Obtain (read-only) collection of all applications.
*/
public Collection<FiCaSchedulerApp> getAllApplications() {
- Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
- pendingOrderingPolicy.getSchedulableEntities());
+ Collection<FiCaSchedulerApp> apps =
+ new HashSet<FiCaSchedulerApp>(pendingOrderingPolicy.getSchedulableEntities());
apps.addAll(orderingPolicy.getSchedulableEntities());
return Collections.unmodifiableCollection(apps);
@@ -2095,7 +1992,6 @@ public class LeafQueue extends AbstractCSQueue {
* Total pending for the queue =
* sum(for each user(min((user's headroom), sum(user's pending requests))))
* NOTE:
-
* @param clusterResources clusterResource
* @param partition node partition
* @param deductReservedFromPending When a container is reserved in CS,
@@ -2208,7 +2104,7 @@ public class LeafQueue extends AbstractCSQueue {
parent.detachContainer(clusterResource, application, rmContainer);
}
}
-
+
/**
* @return all ignored partition exclusivity RMContainers in the LeafQueue,
* this will be used by preemption policy.
@@ -2264,7 +2160,7 @@ public class LeafQueue extends AbstractCSQueue {
getOrderingPolicy() {
return orderingPolicy;
}
-
+
void setOrderingPolicy(
OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
writeLock.lock();
@@ -2318,19 +2214,19 @@ public class LeafQueue extends AbstractCSQueue {
static class QueueResourceLimitsInfo {
private Resource queueCurrentLimit;
private Resource clusterResource;
-
+
public void setQueueCurrentLimit(Resource currentLimit) {
this.queueCurrentLimit = currentLimit;
}
-
+
public Resource getQueueCurrentLimit() {
return queueCurrentLimit;
}
-
+
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
-
+
public Resource getClusterResource() {
return clusterResource;
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
index 7311be7..57050b1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java
@@ -36,15 +36,19 @@ import java.util.Set;
* ManagedParentQueue for auto created dynamic queues
*/
public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
-
private static final Logger LOG = LoggerFactory
.getLogger(AutoCreatedLeafQueue.class);
public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName,
ManagedParentQueue parent) throws IOException {
+ // TODO once YARN-10907 is merged the duplicated collection of
+ // leafQueueConfigs won't be necessary
super(cs, parent.getLeafQueueConfigs(queueName),
queueName,
parent, null);
+ super.setupQueueConfigs(cs.getClusterResource(), parent.getLeafQueueConfigs(queueName));
+
+ LOG.debug("Initialized AutoCreatedLeafQueue: name={}, fullname={}", queueName, getQueuePath());
updateCapacitiesToZero();
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
index 93d0017..fedde05 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java
@@ -182,7 +182,7 @@ public class CSMaxRunningAppsEnforcer {
// the queue was already at its max before the removal.
// Thus we find the ancestor queue highest in the tree for which the app
// that was at its maxRunningApps before the removal.
- LeafQueue queue = app.getCSLeafQueue();
+ AbstractLeafQueue queue = app.getCSLeafQueue();
AbstractCSQueue highestQueueWithAppsNowRunnable =
(queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1)
? queue : null;
@@ -243,7 +243,7 @@ public class CSMaxRunningAppsEnforcer {
}
if (checkRunnabilityWithUpdate(next)) {
- LeafQueue nextQueue = next.getCSLeafQueue();
+ AbstractLeafQueue nextQueue = next.getCSLeafQueue();
LOG.info("{} is now runnable in {}",
next.getApplicationAttemptId(), nextQueue);
trackRunnableApp(next);
@@ -322,9 +322,9 @@ public class CSMaxRunningAppsEnforcer {
private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue,
List<List<FiCaSchedulerApp>> appLists) {
if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) {
- if (queue instanceof LeafQueue) {
+ if (queue instanceof AbstractLeafQueue) {
appLists.add(
- ((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
+ ((AbstractLeafQueue)queue).getCopyOfNonRunnableAppSchedulables());
} else {
for (CSQueue child : queue.getChildQueues()) {
gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index 140a2ac..de31cd13 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -27,13 +27,13 @@ import org.apache.hadoop.yarn.util.resource.Resources;
public class CapacityHeadroomProvider {
UsersManager.User user;
- LeafQueue queue;
+ AbstractLeafQueue queue;
FiCaSchedulerApp application;
- LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
+ AbstractLeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
- public CapacityHeadroomProvider(UsersManager.User user, LeafQueue queue,
+ public CapacityHeadroomProvider(UsersManager.User user, AbstractLeafQueue queue,
FiCaSchedulerApp application,
- LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
+ AbstractLeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
this.user = user;
this.queue = queue;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index bd1089b..befb82a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -917,7 +917,7 @@ public class CapacityScheduler extends
throw new QueueInvalidException(queueErrorMsg);
}
}
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
// During RM restart, this means leaf queue was converted to a parent
// queue, which is not supported for running apps.
if (!appShouldFailFast) {
@@ -942,7 +942,7 @@ public class CapacityScheduler extends
// that means its previous state was DRAINING. So we auto transit
// the state to DRAINING for recovery.
if (queue.getState() == QueueState.STOPPED) {
- ((LeafQueue) queue).recoverDrainingState();
+ ((AbstractLeafQueue) queue).recoverDrainingState();
}
// Submit to the queue
try {
@@ -1090,7 +1090,7 @@ public class CapacityScheduler extends
return;
}
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
String message =
"Application " + applicationId + " submitted by user : " + user
+ " to non-leaf queue : " + queueName;
@@ -1242,7 +1242,7 @@ public class CapacityScheduler extends
return;
}
CSQueue queue = (CSQueue) application.getQueue();
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
LOG.error("Cannot finish application " + "from non-leaf queue: " + queue
.getQueuePath());
} else{
@@ -1301,7 +1301,7 @@ public class CapacityScheduler extends
// Inform the queue
Queue queue = attempt.getQueue();
CSQueue csQueue = (CSQueue) queue;
- if (!(csQueue instanceof LeafQueue)) {
+ if (!(csQueue instanceof AbstractLeafQueue)) {
LOG.error(
"Cannot finish application " + "from non-leaf queue: "
+ csQueue.getQueuePath());
@@ -1367,7 +1367,7 @@ public class CapacityScheduler extends
// Release containers
releaseContainers(release, application);
- LeafQueue updateDemandForQueue = null;
+ AbstractLeafQueue updateDemandForQueue = null;
// Sanity check for new allocation requests
normalizeResourceRequests(ask);
@@ -1398,7 +1398,7 @@ public class CapacityScheduler extends
// Update application requests
if (application.updateResourceRequests(ask) || application
.updateSchedulingRequests(schedulingRequests)) {
- updateDemandForQueue = (LeafQueue) application.getQueue();
+ updateDemandForQueue = (AbstractLeafQueue) application.getQueue();
}
if (LOG.isDebugEnabled()) {
@@ -1783,7 +1783,7 @@ public class CapacityScheduler extends
LOG.debug("Trying to fulfill reservation for application {} on node: {}",
reservedApplication.getApplicationId(), node.getNodeID());
- LeafQueue queue = ((LeafQueue) reservedApplication.getQueue());
+ AbstractLeafQueue queue = ((AbstractLeafQueue) reservedApplication.getQueue());
CSAssignment assignment = queue.assignContainers(getClusterResource(),
new SimpleCandidateNodeSet<>(node),
// TODO, now we only consider limits for parent for non-labeled
@@ -2386,7 +2386,7 @@ public class CapacityScheduler extends
}
// Inform the queue
- LeafQueue queue = (LeafQueue) application.getQueue();
+ AbstractLeafQueue queue = (AbstractLeafQueue) application.getQueue();
queue.completedContainer(getClusterResource(), application, node,
rmContainer, containerStatus, event, null, true);
}
@@ -2673,7 +2673,7 @@ public class CapacityScheduler extends
throws YarnException {
writeLock.lock();
try {
- LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
+ AbstractLeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue);
AbstractManagedParentQueue parent =
(AbstractManagedParentQueue) queue.getParent();
@@ -2716,10 +2716,10 @@ public class CapacityScheduler extends
throw new YarnException("App to be moved " + appId + " not found.");
}
String sourceQueueName = application.getQueue().getQueueName();
- LeafQueue source =
+ AbstractLeafQueue source =
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
String destQueueName = handleMoveToPlanQueue(targetQueueName);
- LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
+ AbstractLeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
String user = application.getUser();
try {
@@ -2777,7 +2777,7 @@ public class CapacityScheduler extends
((CSQueue) queue).getQueuePath() : queue.getQueueName();
this.queueManager.getAndCheckLeafQueue(sourceQueueName);
String destQueueName = handleMoveToPlanQueue(newQueue);
- LeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
+ AbstractLeafQueue dest = this.queueManager.getAndCheckLeafQueue(destQueueName);
// Validation check - ACLs, submission limits for user & queue
String user = application.getUser();
// Check active partition only when attempt is available
@@ -2804,7 +2804,7 @@ public class CapacityScheduler extends
* @param dest
* @throws YarnException
*/
- private void checkQueuePartition(FiCaSchedulerApp app, LeafQueue dest)
+ private void checkQueuePartition(FiCaSchedulerApp app, AbstractLeafQueue dest)
throws YarnException {
if (!YarnConfiguration.areNodeLabelsEnabled(conf)) {
return;
@@ -2854,7 +2854,7 @@ public class CapacityScheduler extends
}
return getMaximumResourceCapability();
}
- if (!(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
LOG.error("queue " + queueName + " is not an leaf queue");
return getMaximumResourceCapability();
}
@@ -2863,7 +2863,7 @@ public class CapacityScheduler extends
// getMaximumResourceCapability() returns maximum allocation considers
// per-node maximum resources. So return (component-wise) min of the two.
- Resource queueMaxAllocation = ((LeafQueue)queue).getMaximumAllocation();
+ Resource queueMaxAllocation = queue.getMaximumAllocation();
Resource clusterMaxAllocationConsiderNodeMax =
getMaximumResourceCapability();
@@ -2989,7 +2989,7 @@ public class CapacityScheduler extends
// As we use iterator over a TreeSet for OrderingPolicy, once we change
// priority then reinsert back to make order correct.
- LeafQueue queue = (LeafQueue) getQueue(rmApp.getQueue());
+ AbstractLeafQueue queue = (AbstractLeafQueue) getQueue(rmApp.getQueue());
queue.updateApplicationPriority(application, appPriority);
LOG.info("Priority '" + appPriority + "' is updated in queue :"
@@ -3404,14 +3404,14 @@ public class CapacityScheduler extends
readLock.lock();
try {
CSQueue queue = getQueue(queueName);
- if (queue == null || !(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
return lifetimeRequestedByApp;
}
long defaultApplicationLifetime =
- ((LeafQueue) queue).getDefaultApplicationLifetime();
+ queue.getDefaultApplicationLifetime();
long maximumApplicationLifetime =
- ((LeafQueue) queue).getMaximumApplicationLifetime();
+ queue.getMaximumApplicationLifetime();
// check only for maximum, that's enough because default can't
// exceed maximum
@@ -3434,7 +3434,7 @@ public class CapacityScheduler extends
@Override
public long getMaximumApplicationLifetime(String queueName) {
CSQueue queue = getQueue(queueName);
- if (queue == null || !(queue instanceof LeafQueue)) {
+ if (!(queue instanceof AbstractLeafQueue)) {
if (isAmbiguous(queueName)) {
LOG.error("Ambiguous queue reference: " + queueName
+ " please use full queue path instead.");
@@ -3444,7 +3444,7 @@ public class CapacityScheduler extends
return -1;
}
// In seconds
- return ((LeafQueue) queue).getMaximumApplicationLifetime();
+ return queue.getMaximumApplicationLifetime();
}
@Override
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
index fd601ac..147f392 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java
@@ -186,7 +186,7 @@ public final class CapacitySchedulerConfigValidator {
+ " is set to true");
}
- if (newQueue instanceof LeafQueue) {
+ if (newQueue instanceof AbstractLeafQueue) {
LOG.info("Converting the parent queue: {} to leaf queue.", oldQueue.getQueuePath());
}
}
@@ -194,7 +194,7 @@ public final class CapacitySchedulerConfigValidator {
private static void validateLeafQueueConversion(CSQueue oldQueue,
CSQueue newQueue) throws IOException {
- if (oldQueue instanceof LeafQueue && newQueue instanceof ParentQueue) {
+ if (oldQueue instanceof AbstractLeafQueue && newQueue instanceof ParentQueue) {
if (isEitherQueueStopped(oldQueue.getState(), newQueue.getState())) {
LOG.info("Converting the leaf queue: {} to parent queue.", oldQueue.getQueuePath());
} else {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
index f0c8a27..407383d 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java
@@ -375,8 +375,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
permissions.add(
new Permission(csQueue.getPrivilegedEntity(), csQueue.getACLs()));
- if (queue instanceof LeafQueue) {
- LeafQueue lQueue = (LeafQueue) queue;
+ if (queue instanceof AbstractLeafQueue) {
+ AbstractLeafQueue lQueue = (AbstractLeafQueue) queue;
// Clear Priority ACLs first since reinitialize also call same.
appPriorityACLManager.clearPriorityACLs(lQueue.getQueuePath());
@@ -397,17 +397,17 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
* @throws YarnException if the queue does not exist or the queue
* is not the type of LeafQueue.
*/
- public LeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
+ public AbstractLeafQueue getAndCheckLeafQueue(String queue) throws YarnException {
CSQueue ret = this.getQueue(queue);
if (ret == null) {
throw new YarnException("The specified Queue: " + queue
+ " doesn't exist");
}
- if (!(ret instanceof LeafQueue)) {
+ if (!(ret instanceof AbstractLeafQueue)) {
throw new YarnException("The specified Queue: " + queue
+ " is not a Leaf Queue.");
}
- return (LeafQueue) ret;
+ return (AbstractLeafQueue) ret;
}
/**
@@ -527,7 +527,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
* @throws YarnException if the given path is not eligible to be auto created
* @throws IOException if the given path can not be added to the parent
*/
- public LeafQueue createQueue(QueuePath queue)
+ public AbstractLeafQueue createQueue(QueuePath queue)
throws YarnException, IOException {
String leafQueueName = queue.getLeafName();
String parentQueueName = queue.getParent();
@@ -668,7 +668,7 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
return leafQueue;
}
- private LeafQueue createLegacyAutoQueue(QueuePath queue)
+ private AbstractLeafQueue createLegacyAutoQueue(QueuePath queue)
throws IOException, SchedulerDynamicEditException {
CSQueue parentQueue = getQueue(queue.getParent());
// Case 1: Handle ManagedParentQueue
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 4592f2a..b9fa932 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -19,128 +19,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.lang3.time.DateUtils;
import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
-import org.apache.hadoop.security.AccessControlException;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hadoop.security.authorize.AccessControlList;
-import org.apache.hadoop.util.Sets;
-import org.apache.hadoop.util.Time;
-import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
-import org.apache.hadoop.yarn.api.records.ApplicationId;
-import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
-import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.ExecutionType;
-import org.apache.hadoop.yarn.api.records.Priority;
-import org.apache.hadoop.yarn.api.records.QueueACL;
-import org.apache.hadoop.yarn.api.records.QueueInfo;
-import org.apache.hadoop.yarn.api.records.QueueState;
-import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
-import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.factories.RecordFactory;
-import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
-import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
-import org.apache.hadoop.yarn.security.AccessType;
-import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
-import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.*;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityDiagnosticConstant;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesLogger;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivityState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt.AMState;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UsersManager.User;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.KillableContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicyForPendingApps;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.IteratorSelector;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.OrderingPolicy;
-import org.apache.hadoop.yarn.server.utils.Lock;
-import org.apache.hadoop.yarn.server.utils.Lock.NoLock;
-import org.apache.hadoop.yarn.util.SystemClock;
-import org.apache.hadoop.yarn.util.resource.Resources;
-
-import org.apache.hadoop.classification.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@Private
@Unstable
-public class LeafQueue extends AbstractCSQueue {
+public class LeafQueue extends AbstractLeafQueue {
private static final Logger LOG =
LoggerFactory.getLogger(LeafQueue.class);
- private float absoluteUsedCapacity = 0.0f;
-
- // TODO the max applications should consider label
- protected int maxApplications;
- protected volatile int maxApplicationsPerUser;
-
- private float maxAMResourcePerQueuePercent;
-
- private volatile int nodeLocalityDelay;
- private volatile int rackLocalityAdditionalDelay;
- private volatile boolean rackLocalityFullReset;
-
- Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
- new ConcurrentHashMap<>();
-
- private Priority defaultAppPriorityPerQueue;
-
- private final OrderingPolicy<FiCaSchedulerApp> pendingOrderingPolicy;
-
- private volatile float minimumAllocationFactor;
-
- private final RecordFactory recordFactory =
- RecordFactoryProvider.getRecordFactory(null);
-
- private final UsersManager usersManager;
-
- // cache last cluster resource to compute actual capacity
- private Resource lastClusterResource = Resources.none();
-
- private final QueueResourceLimitsInfo queueResourceLimitsInfo =
- new QueueResourceLimitsInfo();
-
- private volatile ResourceLimits cachedResourceLimitsForHeadroom = null;
-
- private volatile OrderingPolicy<FiCaSchedulerApp> orderingPolicy = null;
-
- // Map<Partition, Map<SchedulingMode, Map<User, CachedUserLimit>>>
- // Not thread safe: only the last level is a ConcurrentMap
- @VisibleForTesting
- Map<String, Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>>
- userLimitsCache = new HashMap<>();
-
- // Not thread safe
- @VisibleForTesting
- long currentUserLimitCacheVersion = 0;
-
- // record all ignore partition exclusivityRMContainer, this will be used to do
- // preemption, key is the partition of the RMContainer allocated on
- private Map<String, TreeSet<RMContainer>> ignorePartitionExclusivityRMContainers =
- new ConcurrentHashMap<>();
-
- List<AppPriorityACLGroup> priorityAcls =
- new ArrayList<AppPriorityACLGroup>();
-
- private final List<FiCaSchedulerApp> runnableApps = new ArrayList<>();
- private final List<FiCaSchedulerApp> nonRunnableApps = new ArrayList<>();
-
@SuppressWarnings({ "unchecked", "rawtypes" })
public LeafQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
@@ -157,2330 +47,10 @@ public class LeafQueue extends AbstractCSQueue {
CapacitySchedulerConfiguration configuration,
String queueName, CSQueue parent, CSQueue old, boolean isDynamic) throws
IOException {
- super(cs, configuration, queueName, parent, old);
- setDynamicQueue(isDynamic);
-
- this.usersManager = new UsersManager(usageTracker.getMetrics(), this, labelManager, csContext,
- resourceCalculator);
-
- // One time initialization is enough since it is static ordering policy
- this.pendingOrderingPolicy = new FifoOrderingPolicyForPendingApps();
-
- LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
+ super(cs, configuration, queueName, parent, old, isDynamic);
setupQueueConfigs(cs.getClusterResource(), configuration);
- }
-
- @SuppressWarnings("checkstyle:nowhitespaceafter")
- protected void setupQueueConfigs(Resource clusterResource,
- CapacitySchedulerConfiguration conf) throws
- IOException {
- writeLock.lock();
- try {
- CapacitySchedulerConfiguration schedConf = csContext.getConfiguration();
- super.setupQueueConfigs(clusterResource, conf);
-
- this.lastClusterResource = clusterResource;
-
- this.cachedResourceLimitsForHeadroom = new ResourceLimits(
- clusterResource);
-
- // Initialize headroom info, also used for calculating application
- // master resource limits. Since this happens during queue initialization
- // and all queues may not be realized yet, we'll use (optimistic)
- // absoluteMaxCapacity (it will be replaced with the more accurate
- // absoluteMaxAvailCapacity during headroom/userlimit/allocation events)
- setQueueResourceLimitsInfo(clusterResource);
-
- setOrderingPolicy(
- conf.<FiCaSchedulerApp>getAppOrderingPolicy(getQueuePath()));
-
- usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
- usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
-
- maxAMResourcePerQueuePercent =
- conf.getMaximumApplicationMasterResourcePerQueuePercent(
- getQueuePath());
-
- maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
- if (maxApplications < 0) {
- int maxGlobalPerQueueApps =
- csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue();
- if (maxGlobalPerQueueApps > 0) {
- maxApplications = maxGlobalPerQueueApps;
- }
- }
-
- priorityAcls = conf.getPriorityAcls(getQueuePath(),
- csContext.getMaxClusterLevelAppPriority());
-
- Set<String> accessibleNodeLabels = this.queueNodeLabelsSettings.getAccessibleNodeLabels();
- if (!SchedulerUtils.checkQueueLabelExpression(accessibleNodeLabels,
- this.queueNodeLabelsSettings.getDefaultLabelExpression(), null)) {
- throw new IOException(
- "Invalid default label expression of " + " queue=" + getQueuePath()
- + " doesn't have permission to access all labels "
- + "in default label expression. labelExpression of resource request="
- + getDefaultNodeLabelExpressionStr() + ". Queue labels=" + (
- getAccessibleNodeLabels() == null ?
- "" :
- StringUtils
- .join(getAccessibleNodeLabels().iterator(), ',')));
- }
-
- nodeLocalityDelay = schedConf.getNodeLocalityDelay();
- rackLocalityAdditionalDelay = schedConf
- .getRackLocalityAdditionalDelay();
- rackLocalityFullReset = schedConf
- .getRackLocalityFullReset();
-
- // re-init this since max allocation could have changed
- this.minimumAllocationFactor = Resources.ratio(resourceCalculator,
- Resources.subtract(
- queueAllocationSettings.getMaximumAllocation(),
- queueAllocationSettings.getMinimumAllocation()),
- queueAllocationSettings.getMaximumAllocation());
-
- StringBuilder aclsString = new StringBuilder();
- for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
- aclsString.append(e.getKey() + ":" + e.getValue().getAclString());
- }
-
- StringBuilder labelStrBuilder = new StringBuilder();
- if (accessibleNodeLabels != null) {
- for (String nodeLabel : accessibleNodeLabels) {
- labelStrBuilder.append(nodeLabel).append(",");
- }
- }
-
- defaultAppPriorityPerQueue = Priority.newInstance(
- conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
-
- // Validate leaf queue's user's weights.
- float queueUserLimit = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
- getUserWeights().validateForLeafQueue(queueUserLimit, getQueuePath());
- usersManager.updateUserWeights();
-
- LOG.info(
- "Initializing " + getQueuePath() + "\n" +
- getExtendedCapacityOrWeightString() + "\n"
- + "absoluteCapacity = " + queueCapacities.getAbsoluteCapacity()
- + " [= parentAbsoluteCapacity * capacity ]" + "\n"
- + "maxCapacity = " + queueCapacities.getMaximumCapacity()
- + " [= configuredMaxCapacity ]" + "\n" + "absoluteMaxCapacity = "
- + queueCapacities.getAbsoluteMaximumCapacity()
- + " [= 1.0 maximumCapacity undefined, "
- + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]"
- + "\n" + "effectiveMinResource=" +
- getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n"
- + " , effectiveMaxResource=" +
- getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL)
- + "\n" + "userLimit = " + usersManager.getUserLimit()
- + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = "
- + usersManager.getUserLimitFactor()
- + " [= configuredUserLimitFactor ]" + "\n" + "maxApplications = "
- + maxApplications
- + " [= configuredMaximumSystemApplicationsPerQueue or"
- + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]"
- + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser
- + " [= (int)(maxApplications * (userLimit / 100.0f) * "
- + "userLimitFactor) ]" + "\n"
- + "maxParallelApps = " + getMaxParallelApps() + "\n"
- + "usedCapacity = " +
- + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / "
- + "(clusterResourceMemory * absoluteCapacity)]" + "\n"
- + "absoluteUsedCapacity = " + absoluteUsedCapacity
- + " [= usedResourcesMemory / clusterResourceMemory]" + "\n"
- + "maxAMResourcePerQueuePercent = " + maxAMResourcePerQueuePercent
- + " [= configuredMaximumAMResourcePercent ]" + "\n"
- + "minimumAllocationFactor = " + minimumAllocationFactor
- + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / "
- + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = "
- + queueAllocationSettings.getMaximumAllocation() +
- " [= configuredMaxAllocation ]" + "\n"
- + "numContainers = " + usageTracker.getNumContainers()
- + " [= currentNumContainers ]" + "\n" + "state = " + getState()
- + " [= configuredState ]" + "\n" + "acls = " + aclsString
- + " [= configuredAcls ]" + "\n"
- + "nodeLocalityDelay = " + nodeLocalityDelay + "\n"
- + "rackLocalityAdditionalDelay = "
- + rackLocalityAdditionalDelay + "\n"
- + "labels=" + labelStrBuilder.toString() + "\n"
- + "reservationsContinueLooking = "
- + reservationsContinueLooking + "\n" + "preemptionDisabled = "
- + getPreemptionDisabled() + "\n" + "defaultAppPriorityPerQueue = "
- + defaultAppPriorityPerQueue + "\npriority = " + priority
- + "\nmaxLifetime = " + getMaximumApplicationLifetime()
- + " seconds" + "\ndefaultLifetime = "
- + getDefaultApplicationLifetime() + " seconds");
- } finally {
- writeLock.unlock();
- }
- }
-
- private String getDefaultNodeLabelExpressionStr() {
- String defaultLabelExpression = queueNodeLabelsSettings.getDefaultLabelExpression();
- return defaultLabelExpression == null ? "" : defaultLabelExpression;
- }
-
- /**
- * Used only by tests.
- */
- @Private
- public float getMinimumAllocationFactor() {
- return minimumAllocationFactor;
- }
-
- /**
- * Used only by tests.
- */
- @Private
- public float getMaxAMResourcePerQueuePercent() {
- return maxAMResourcePerQueuePercent;
- }
-
- public int getMaxApplications() {
- return maxApplications;
- }
-
- public int getMaxApplicationsPerUser() {
- return maxApplicationsPerUser;
- }
-
- /**
- *
- * @return UsersManager instance.
- */
- public UsersManager getUsersManager() {
- return usersManager;
- }
-
- @Override
- public AbstractUsersManager getAbstractUsersManager() {
- return usersManager;
- }
-
- @Override
- public List<CSQueue> getChildQueues() {
- return null;
- }
-
- /**
- * Set user limit.
- * @param userLimit new user limit
- */
- @VisibleForTesting
- void setUserLimit(float userLimit) {
- usersManager.setUserLimit(userLimit);
- usersManager.userLimitNeedsRecompute();
- }
-
- /**
- * Set user limit factor.
- * @param userLimitFactor new user limit factor
- */
- @VisibleForTesting
- void setUserLimitFactor(float userLimitFactor) {
- usersManager.setUserLimitFactor(userLimitFactor);
- usersManager.userLimitNeedsRecompute();
- }
-
- @Override
- public int getNumApplications() {
- readLock.lock();
- try {
- return getNumPendingApplications() + getNumActiveApplications() +
- getNumNonRunnableApps();
- } finally {
- readLock.unlock();
- }
- }
-
- public int getNumPendingApplications() {
- readLock.lock();
- try {
- return pendingOrderingPolicy.getNumSchedulableEntities();
- } finally {
- readLock.unlock();
- }
- }
-
- public int getNumActiveApplications() {
- readLock.lock();
- try {
- return orderingPolicy.getNumSchedulableEntities();
- } finally {
- readLock.unlock();
- }
- }
-
- @Private
- public int getNumPendingApplications(String user) {
- readLock.lock();
- try {
- User u = getUser(user);
- if (null == u) {
- return 0;
- }
- return u.getPendingApplications();
- } finally {
- readLock.unlock();
- }
- }
-
- @Private
- public int getNumActiveApplications(String user) {
- readLock.lock();
- try {
- User u = getUser(user);
- if (null == u) {
- return 0;
- }
- return u.getActiveApplications();
- } finally {
- readLock.unlock();
- }
- }
-
- @Private
- public float getUserLimit() {
- return usersManager.getUserLimit();
- }
-
- @Private
- public float getUserLimitFactor() {
- return usersManager.getUserLimitFactor();
- }
-
- @Override
- public QueueInfo getQueueInfo(
- boolean includeChildQueues, boolean recursive) {
- QueueInfo queueInfo = getQueueInfo();
- return queueInfo;
- }
-
- @Override
- public List<QueueUserACLInfo>
- getQueueUserAclInfo(UserGroupInformation user) {
- readLock.lock();
- try {
- QueueUserACLInfo userAclInfo = recordFactory.newRecordInstance(
- QueueUserACLInfo.class);
- List<QueueACL> operations = new ArrayList<>();
- for (QueueACL operation : QueueACL.values()) {
- if (hasAccess(operation, user)) {
- operations.add(operation);
- }
- }
-
- userAclInfo.setQueueName(getQueuePath());
- userAclInfo.setUserAcls(operations);
- return Collections.singletonList(userAclInfo);
- } finally {
- readLock.unlock();
- }
-
- }
-
- public String toString() {
- readLock.lock();
- try {
- return getQueuePath() + ": " + getCapacityOrWeightString()
- + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity()
- + ", " + "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", "
- + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity="
- + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications()
- + ", " + "numContainers=" + getNumContainers() + ", "
- + "effectiveMinResource=" +
- getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) +
- " , effectiveMaxResource=" +
- getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL);
- } finally {
- readLock.unlock();
- }
- }
-
- protected String getExtendedCapacityOrWeightString() {
- if (queueCapacities.getWeight() != -1) {
- return "weight = " + queueCapacities.getWeight()
- + " [= (float) configuredCapacity (with w suffix)] " + "\n"
- + "normalizedWeight = " + queueCapacities.getNormalizedWeight()
- + " [= (float) configuredCapacity / sum(configuredCapacity of " +
- "all queues under the parent)]";
- } else {
- return "capacity = " + queueCapacities.getCapacity()
- + " [= (float) configuredCapacity / 100 ]";
- }
- }
-
- @VisibleForTesting
- public User getUser(String userName) {
- return usersManager.getUser(userName);
- }
-
- @VisibleForTesting
- public User getOrCreateUser(String userName) {
- return usersManager.getUserAndAddIfAbsent(userName);
- }
-
- @Private
- public List<AppPriorityACLGroup> getPriorityACLs() {
- readLock.lock();
- try {
- return new ArrayList<>(priorityAcls);
- } finally {
- readLock.unlock();
- }
- }
-
- protected void reinitialize(
- CSQueue newlyParsedQueue, Resource clusterResource,
- CapacitySchedulerConfiguration configuration) throws
- IOException {
-
- writeLock.lock();
- try {
- // We skip reinitialize for dynamic queues, when this is called, and
- // new queue is different from this queue, we will make this queue to be
- // static queue.
- if (newlyParsedQueue != this) {
- this.setDynamicQueue(false);
- }
-
- // Sanity check
- if (!(newlyParsedQueue instanceof LeafQueue) || !newlyParsedQueue
- .getQueuePath().equals(getQueuePath())) {
- throw new IOException(
- "Trying to reinitialize " + getQueuePath() + " from "
- + newlyParsedQueue.getQueuePath());
- }
-
- LeafQueue newlyParsedLeafQueue = (LeafQueue) newlyParsedQueue;
-
- // don't allow the maximum allocation to be decreased in size
- // since we have already told running AM's the size
- Resource oldMax = getMaximumAllocation();
- Resource newMax = newlyParsedLeafQueue.getMaximumAllocation();
-
- if (!Resources.fitsIn(oldMax, newMax)) {
- throw new IOException("Trying to reinitialize " + getQueuePath()
- + " the maximum allocation size can not be decreased!"
- + " Current setting: " + oldMax + ", trying to set it to: "
- + newMax);
- }
-
- setupQueueConfigs(clusterResource, configuration);
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void reinitialize(
- CSQueue newlyParsedQueue, Resource clusterResource)
- throws IOException {
- reinitialize(newlyParsedQueue, clusterResource,
- csContext.getConfiguration());
- }
-
- @Override
- public void submitApplicationAttempt(FiCaSchedulerApp application,
- String userName) {
- submitApplicationAttempt(application, userName, false);
- }
-
- @Override
- public void submitApplicationAttempt(FiCaSchedulerApp application,
- String userName, boolean isMoveApp) {
- // Careful! Locking order is important!
- writeLock.lock();
- try {
- // TODO, should use getUser, use this method just to avoid UT failure
- // which is caused by wrong invoking order, will fix UT separately
- User user = usersManager.getUserAndAddIfAbsent(userName);
-
- // Add the attempt to our data-structures
- addApplicationAttempt(application, user);
- } finally {
- writeLock.unlock();
- }
-
- // We don't want to update metrics for move app
- if (!isMoveApp) {
- boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
- application.getAppSchedulingInfo().isUnmanagedAM();
- usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);
- }
-
- parent.submitApplicationAttempt(application, userName);
- }
-
- @Override
- public void submitApplication(ApplicationId applicationId, String userName,
- String queue) throws AccessControlException {
- // Careful! Locking order is important!
- validateSubmitApplication(applicationId, userName, queue);
-
- // Signal for expired auto deletion.
- updateLastSubmittedTimeStamp();
-
- // Inform the parent queue
- try {
- parent.submitApplication(applicationId, userName, queue);
- } catch (AccessControlException ace) {
- LOG.info("Failed to submit application to parent-queue: " +
- parent.getQueuePath(), ace);
- throw ace;
- }
-
- }
-
- public void validateSubmitApplication(ApplicationId applicationId,
- String userName, String queue) throws AccessControlException {
- writeLock.lock();
- try {
- // Check if the queue is accepting jobs
- if (getState() != QueueState.RUNNING) {
- String msg = "Queue " + getQueuePath()
- + " is STOPPED. Cannot accept submission of application: "
- + applicationId;
- LOG.info(msg);
- throw new AccessControlException(msg);
- }
-
- // Check submission limits for queues
- //TODO recalculate max applications because they can depend on capacity
- if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) {
- String msg =
- "Queue " + getQueuePath() + " already has " + getNumApplications()
- + " applications,"
- + " cannot accept submission of application: " + applicationId;
- LOG.info(msg);
- throw new AccessControlException(msg);
- }
-
- // Check submission limits for the user on this queue
- User user = usersManager.getUserAndAddIfAbsent(userName);
- //TODO recalculate max applications because they can depend on capacity
- if (user.getTotalApplications() >= getMaxApplicationsPerUser() && !(this instanceof AutoCreatedLeafQueue)) {
- String msg = "Queue " + getQueuePath() + " already has " + user
- .getTotalApplications() + " applications from user " + userName
- + " cannot accept submission of application: " + applicationId;
- LOG.info(msg);
- throw new AccessControlException(msg);
- }
- } finally {
- writeLock.unlock();
- }
-
- try {
- parent.validateSubmitApplication(applicationId, userName, queue);
- } catch (AccessControlException ace) {
- LOG.info("Failed to submit application to parent-queue: " +
- parent.getQueuePath(), ace);
- throw ace;
- }
- }
-
- public Resource getAMResourceLimit() {
- return usageTracker.getQueueUsage().getAMLimit();
- }
-
- public Resource getAMResourceLimitPerPartition(String nodePartition) {
- return usageTracker.getQueueUsage().getAMLimit(nodePartition);
- }
-
- @VisibleForTesting
- public Resource calculateAndGetAMResourceLimit() {
- return calculateAndGetAMResourceLimitPerPartition(
- RMNodeLabelsManager.NO_LABEL);
- }
-
- @VisibleForTesting
- public Resource getUserAMResourceLimit() {
- return getUserAMResourceLimitPerPartition(RMNodeLabelsManager.NO_LABEL,
- null);
- }
-
- public Resource getUserAMResourceLimitPerPartition(
- String nodePartition, String userName) {
- float userWeight = 1.0f;
- if (userName != null && getUser(userName) != null) {
- userWeight = getUser(userName).getWeight();
- }
-
- readLock.lock();
- try {
- /*
- * The user am resource limit is based on the same approach as the user
- * limit (as it should represent a subset of that). This means that it uses
- * the absolute queue capacity (per partition) instead of the max and is
- * modified by the userlimit and the userlimit factor as is the userlimit
- */
- float effectiveUserLimit = Math.max(usersManager.getUserLimit() / 100.0f,
- 1.0f / Math.max(getAbstractUsersManager().getNumActiveUsers(), 1));
- float preWeightedUserLimit = effectiveUserLimit;
- effectiveUserLimit = Math.min(effectiveUserLimit * userWeight, 1.0f);
-
- Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
-
- Resource minimumAllocation = queueAllocationSettings.getMinimumAllocation();
-
- Resource userAMLimit = Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionResource,
- queueCapacities.getMaxAMResourcePercentage(nodePartition)
- * effectiveUserLimit * usersManager.getUserLimitFactor(),
- minimumAllocation);
-
- if (getUserLimitFactor() == -1) {
- userAMLimit = Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionResource,
- queueCapacities.getMaxAMResourcePercentage(nodePartition),
- minimumAllocation);
- }
-
- userAMLimit =
- Resources.min(resourceCalculator, lastClusterResource,
- userAMLimit,
- Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
-
- Resource preWeighteduserAMLimit =
- Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionResource,
- queueCapacities.getMaxAMResourcePercentage(nodePartition)
- * preWeightedUserLimit * usersManager.getUserLimitFactor(),
- minimumAllocation);
-
- if (getUserLimitFactor() == -1) {
- preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionResource,
- queueCapacities.getMaxAMResourcePercentage(nodePartition),
- minimumAllocation);
- }
-
- preWeighteduserAMLimit =
- Resources.min(resourceCalculator, lastClusterResource,
- preWeighteduserAMLimit,
- Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
- usageTracker.getQueueUsage().setUserAMLimit(nodePartition, preWeighteduserAMLimit);
-
- LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted"
- + " user AM limit: {}. User weight: {}", userName,
- preWeighteduserAMLimit, userAMLimit, userWeight);
- return userAMLimit;
- } finally {
- readLock.unlock();
- }
-
- }
-
- public Resource calculateAndGetAMResourceLimitPerPartition(
- String nodePartition) {
- writeLock.lock();
- try {
- /*
- * For non-labeled partition, get the max value from resources currently
- * available to the queue and the absolute resources guaranteed for the
- * partition in the queue. For labeled partition, consider only the absolute
- * resources guaranteed. Multiply this value (based on labeled/
- * non-labeled), * with per-partition am-resource-percent to get the max am
- * resource limit for this queue and partition.
- */
- Resource queuePartitionResource = getEffectiveCapacity(nodePartition);
-
- Resource queueCurrentLimit = Resources.none();
- // For non-labeled partition, we need to consider the current queue
- // usage limit.
- if (nodePartition.equals(RMNodeLabelsManager.NO_LABEL)) {
- synchronized (queueResourceLimitsInfo){
- queueCurrentLimit = queueResourceLimitsInfo.getQueueCurrentLimit();
- }
- }
-
- float amResourcePercent = queueCapacities.getMaxAMResourcePercentage(
- nodePartition);
-
- // Current usable resource for this queue and partition is the max of
- // queueCurrentLimit and queuePartitionResource.
- // If any of the resources available to this queue are less than queue's
- // guarantee, use the guarantee as the queuePartitionUsableResource
- // because nothing less than the queue's guarantee should be used when
- // calculating the AM limit.
- Resource queuePartitionUsableResource = (Resources.fitsIn(
- resourceCalculator, queuePartitionResource, queueCurrentLimit)) ?
- queueCurrentLimit : queuePartitionResource;
-
- Resource amResouceLimit = Resources.multiplyAndNormalizeUp(
- resourceCalculator, queuePartitionUsableResource, amResourcePercent,
- queueAllocationSettings.getMinimumAllocation());
-
- usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit);
- usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit);
- LOG.debug("Queue: {}, node label : {}, queue partition resource : {},"
- + " queue current limit : {}, queue partition usable resource : {},"
- + " amResourceLimit : {}", getQueuePath(), nodePartition,
- queuePartitionResource, queueCurrentLimit,
- queuePartitionUsableResource, amResouceLimit);
- return amResouceLimit;
- } finally {
- writeLock.unlock();
- }
- }
-
- protected void activateApplications() {
- writeLock.lock();
- try {
- // limit of allowed resource usage for application masters
- Map<String, Resource> userAmPartitionLimit =
- new HashMap<String, Resource>();
-
- // AM Resource Limit for accessible labels can be pre-calculated.
- // This will help in updating AMResourceLimit for all labels when queue
- // is initialized for the first time (when no applications are present).
- for (String nodePartition : getNodeLabelsForQueue()) {
- calculateAndGetAMResourceLimitPerPartition(nodePartition);
- }
-
- for (Iterator<FiCaSchedulerApp> fsApp =
- getPendingAppsOrderingPolicy()
- .getAssignmentIterator(IteratorSelector.EMPTY_ITERATOR_SELECTOR);
- fsApp.hasNext(); ) {
- FiCaSchedulerApp application = fsApp.next();
- ApplicationId applicationId = application.getApplicationId();
-
- // Get the am-node-partition associated with each application
- // and calculate max-am resource limit for this partition.
- String partitionName = application.getAppAMNodePartitionName();
-
- Resource amLimit = getAMResourceLimitPerPartition(partitionName);
- // Verify whether we already calculated am-limit for this label.
- if (amLimit == null) {
- amLimit = calculateAndGetAMResourceLimitPerPartition(partitionName);
- }
- // Check am resource limit.
- Resource amIfStarted = Resources.add(
- application.getAMResource(partitionName),
- usageTracker.getQueueUsage().getAMUsed(partitionName));
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("application " + application.getId() + " AMResource "
- + application.getAMResource(partitionName)
- + " maxAMResourcePerQueuePercent " + maxAMResourcePerQueuePercent
- + " amLimit " + amLimit + " lastClusterResource "
- + lastClusterResource + " amIfStarted " + amIfStarted
- + " AM node-partition name " + partitionName);
- }
-
- if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) {
- if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
- resourceCalculator, lastClusterResource,
- usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
- LOG.warn("maximum-am-resource-percent is insufficient to start a"
- + " single application in queue, it is likely set too low."
- + " skipping enforcement to allow at least one application"
- + " to start");
- } else{
- application.updateAMContainerDiagnostics(AMState.INACTIVATED,
- CSAMContainerLaunchDiagnosticsConstants.QUEUE_AM_RESOURCE_LIMIT_EXCEED);
- LOG.debug("Not activating application {} as amIfStarted: {}"
- + " exceeds amLimit: {}", applicationId, amIfStarted, amLimit);
- continue;
- }
- }
-
- // Check user am resource limit
- User user = usersManager.getUserAndAddIfAbsent(application.getUser());
- Resource userAMLimit = userAmPartitionLimit.get(partitionName);
-
- // Verify whether we already calculated user-am-limit for this label.
- if (userAMLimit == null) {
- userAMLimit = getUserAMResourceLimitPerPartition(partitionName,
- application.getUser());
- userAmPartitionLimit.put(partitionName, userAMLimit);
- }
-
- Resource userAmIfStarted = Resources.add(
- application.getAMResource(partitionName),
- user.getConsumedAMResources(partitionName));
-
- if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) {
- if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual(
- resourceCalculator, lastClusterResource,
- usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) {
- LOG.warn("maximum-am-resource-percent is insufficient to start a"
- + " single application in queue for user, it is likely set too"
- + " low. skipping enforcement to allow at least one application"
- + " to start");
- } else{
- application.updateAMContainerDiagnostics(AMState.INACTIVATED,
- CSAMContainerLaunchDiagnosticsConstants.USER_AM_RESOURCE_LIMIT_EXCEED);
- LOG.debug("Not activating application {} for user: {} as"
- + " userAmIfStarted: {} exceeds userAmLimit: {}",
- applicationId, user, userAmIfStarted, userAMLimit);
- continue;
- }
- }
- user.activateApplication();
- orderingPolicy.addSchedulableEntity(application);
- application.updateAMContainerDiagnostics(AMState.ACTIVATED, null);
-
- usageTracker.getQueueUsage().incAMUsed(partitionName,
- application.getAMResource(partitionName));
- user.getResourceUsage().incAMUsed(partitionName,
- application.getAMResource(partitionName));
- user.getResourceUsage().setAMLimit(partitionName, userAMLimit);
- usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(),
- application.getAMResource(partitionName));
- usageTracker.getMetrics().setAMResouceLimitForUser(partitionName,
- application.getUser(), userAMLimit);
- fsApp.remove();
- LOG.info("Application " + applicationId + " from user: " + application
- .getUser() + " activated in queue: " + getQueuePath());
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- private void addApplicationAttempt(FiCaSchedulerApp application,
- User user) {
- writeLock.lock();
- try {
- applicationAttemptMap.put(application.getApplicationAttemptId(),
- application);
-
- if (application.isRunnable()) {
- runnableApps.add(application);
- LOG.debug("Adding runnable application: {}",
- application.getApplicationAttemptId());
- } else {
- nonRunnableApps.add(application);
- LOG.info("Application attempt {} is not runnable,"
- + " parallel limit reached", application.getApplicationAttemptId());
- return;
- }
-
- // Accept
- user.submitApplication();
- getPendingAppsOrderingPolicy().addSchedulableEntity(application);
-
- // Activate applications
- if (Resources.greaterThan(resourceCalculator, lastClusterResource,
- lastClusterResource, Resources.none())) {
- activateApplications();
- } else {
- application.updateAMContainerDiagnostics(AMState.INACTIVATED,
- CSAMContainerLaunchDiagnosticsConstants.CLUSTER_RESOURCE_EMPTY);
- LOG.info("Skipping activateApplications for "
- + application.getApplicationAttemptId()
- + " since cluster resource is " + Resources.none());
- }
-
- LOG.info(
- "Application added -" + " appId: " + application.getApplicationId()
- + " user: " + application.getUser() + "," + " leaf-queue: "
- + getQueuePath() + " #user-pending-applications: " + user
- .getPendingApplications() + " #user-active-applications: " + user
- .getActiveApplications() + " #queue-pending-applications: "
- + getNumPendingApplications() + " #queue-active-applications: "
- + getNumActiveApplications()
- + " #queue-nonrunnable-applications: "
- + getNumNonRunnableApps());
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void finishApplication(ApplicationId application, String user) {
- // Inform the activeUsersManager
- usersManager.deactivateApplication(user, application);
-
- appFinished();
-
- // Inform the parent queue
- parent.finishApplication(application, user);
- }
-
- @Override
- public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) {
- // Careful! Locking order is important!
- removeApplicationAttempt(application, application.getUser());
- parent.finishApplicationAttempt(application, queue);
- }
-
- private void removeApplicationAttempt(
- FiCaSchedulerApp application, String userName) {
-
- writeLock.lock();
- try {
- // TODO, should use getUser, use this method just to avoid UT failure
- // which is caused by wrong invoking order, will fix UT separately
- User user = usersManager.getUserAndAddIfAbsent(userName);
-
- boolean runnable = runnableApps.remove(application);
- if (!runnable) {
- // removeNonRunnableApp acquires the write lock again, which is fine
- if (!removeNonRunnableApp(application)) {
- LOG.error("Given app to remove " + application +
- " does not exist in queue " + getQueuePath());
- }
- }
-
- String partitionName = application.getAppAMNodePartitionName();
- boolean wasActive = orderingPolicy.removeSchedulableEntity(application);
- if (!wasActive) {
- pendingOrderingPolicy.removeSchedulableEntity(application);
- } else{
- usageTracker.getQueueUsage().decAMUsed(partitionName,
- application.getAMResource(partitionName));
- user.getResourceUsage().decAMUsed(partitionName,
- application.getAMResource(partitionName));
- usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(),
- application.getAMResource(partitionName));
- }
- applicationAttemptMap.remove(application.getApplicationAttemptId());
-
- user.finishApplication(wasActive);
- if (user.getTotalApplications() == 0) {
- usersManager.removeUser(application.getUser());
- }
-
- // Check if we can activate more applications
- activateApplications();
-
- LOG.info(
- "Application removed -" + " appId: " + application.getApplicationId()
- + " user: " + application.getUser() + " queue: " + getQueuePath()
- + " #user-pending-applications: " + user.getPendingApplications()
- + " #user-active-applications: " + user.getActiveApplications()
- + " #queue-pending-applications: " + getNumPendingApplications()
- + " #queue-active-applications: " + getNumActiveApplications());
- } finally {
- writeLock.unlock();
- }
- }
-
- private FiCaSchedulerApp getApplication(
- ApplicationAttemptId applicationAttemptId) {
- return applicationAttemptMap.get(applicationAttemptId);
- }
-
- private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) {
- // Set preemption-allowed:
- // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues
- if (!usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)
- .equals(Resources.none())) {
- limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator,
- csContext.getClusterResource(), usageTracker.getQueueUsage().getUsed(nodePartition),
- usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition)));
- return;
- }
-
- float usedCapacity = queueCapacities.getAbsoluteUsedCapacity(nodePartition);
- float guaranteedCapacity = queueCapacities.getAbsoluteCapacity(nodePartition);
- limits.setIsAllowPreemption(usedCapacity < guaranteedCapacity);
- }
-
- private CSAssignment allocateFromReservedContainer(Resource clusterResource,
- CandidateNodeSet<FiCaSchedulerNode> candidates,
- ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
-
- // Irrespective of Single / Multi Node Placement, the allocate from
- // Reserved Container has to happen only for the single node which
- // CapacityScheduler#allocateFromReservedContainer invokes with.
- // Else In Multi Node Placement, there won't be any Allocation or
- // Reserve of new containers when there is a RESERVED container on
- // a node which is full.
- FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
- if (node != null) {
- RMContainer reservedContainer = node.getReservedContainer();
- if (reservedContainer != null) {
- FiCaSchedulerApp application = getApplication(
- reservedContainer.getApplicationAttemptId());
-
- if (null != application) {
- ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node, SystemClock.getInstance().getTime(), application);
- CSAssignment assignment = application.assignContainers(
- clusterResource, candidates, currentResourceLimits,
- schedulingMode, reservedContainer);
- return assignment;
- }
- }
- }
-
- return null;
- }
-
- private ConcurrentMap<String, CachedUserLimit> getUserLimitCache(
- String partition,
- SchedulingMode schedulingMode) {
- synchronized (userLimitsCache) {
- long latestVersion = usersManager.getLatestVersionOfUsersState();
-
- if (latestVersion != this.currentUserLimitCacheVersion) {
- // User limits cache needs invalidating
- this.currentUserLimitCacheVersion = latestVersion;
- userLimitsCache.clear();
-
- Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
- uLCByPartition = new HashMap<>();
- userLimitsCache.put(partition, uLCByPartition);
-
- ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
- new ConcurrentHashMap<>();
- uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
-
- return uLCBySchedulingMode;
- }
-
- // User limits cache does not need invalidating
- Map<SchedulingMode, ConcurrentMap<String, CachedUserLimit>>
- uLCByPartition = userLimitsCache.get(partition);
- if (uLCByPartition == null) {
- uLCByPartition = new HashMap<>();
- userLimitsCache.put(partition, uLCByPartition);
- }
-
- ConcurrentMap<String, CachedUserLimit> uLCBySchedulingMode =
- uLCByPartition.get(schedulingMode);
- if (uLCBySchedulingMode == null) {
- uLCBySchedulingMode = new ConcurrentHashMap<>();
- uLCByPartition.put(schedulingMode, uLCBySchedulingMode);
- }
-
- return uLCBySchedulingMode;
- }
- }
-
- @Override
- public CSAssignment assignContainers(Resource clusterResource,
- CandidateNodeSet<FiCaSchedulerNode> candidates,
- ResourceLimits currentResourceLimits, SchedulingMode schedulingMode) {
- updateCurrentResourceLimits(currentResourceLimits, clusterResource);
- FiCaSchedulerNode node = CandidateNodeSetUtils.getSingleNode(candidates);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("assignContainers: partition=" + candidates.getPartition()
- + " #applications=" + orderingPolicy.getNumSchedulableEntities());
- }
-
- setPreemptionAllowed(currentResourceLimits, candidates.getPartition());
-
- // Check for reserved resources, try to allocate reserved container first.
- CSAssignment assignment = allocateFromReservedContainer(clusterResource,
- candidates, currentResourceLimits, schedulingMode);
- if (null != assignment) {
- return assignment;
- }
-
- // if our queue cannot access this node, just return
- if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
- && !queueNodeLabelsSettings.isAccessibleToPartition(candidates.getPartition())) {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
- ActivityDiagnosticConstant.QUEUE_NOT_ABLE_TO_ACCESS_PARTITION);
- return CSAssignment.NULL_ASSIGNMENT;
- }
-
- // Check if this queue need more resource, simply skip allocation if this
- // queue doesn't need more resources.
- if (!hasPendingResourceRequest(candidates.getPartition(), clusterResource,
- schedulingMode)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip this queue=" + getQueuePath()
- + ", because it doesn't need more resource, schedulingMode="
- + schedulingMode.name() + " node-partition=" + candidates
- .getPartition());
- }
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
- ActivityDiagnosticConstant.QUEUE_DO_NOT_NEED_MORE_RESOURCE);
- return CSAssignment.NULL_ASSIGNMENT;
- }
-
- ConcurrentMap<String, CachedUserLimit> userLimits =
- this.getUserLimitCache(candidates.getPartition(), schedulingMode);
- boolean needAssignToQueueCheck = true;
- IteratorSelector sel = new IteratorSelector();
- sel.setPartition(candidates.getPartition());
- for (Iterator<FiCaSchedulerApp> assignmentIterator =
- orderingPolicy.getAssignmentIterator(sel);
- assignmentIterator.hasNext(); ) {
- FiCaSchedulerApp application = assignmentIterator.next();
-
- ActivitiesLogger.APP.startAppAllocationRecording(activitiesManager,
- node, SystemClock.getInstance().getTime(), application);
-
- // Check queue max-capacity limit
- Resource appReserved = application.getCurrentReservation();
- if (needAssignToQueueCheck) {
- if (!super.canAssignToThisQueue(clusterResource,
- candidates.getPartition(), currentResourceLimits, appReserved,
- schedulingMode)) {
- ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
- activitiesManager, node, application, application.getPriority(),
- ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(),
- ActivityState.REJECTED,
- ActivityDiagnosticConstant.QUEUE_HIT_MAX_CAPACITY_LIMIT);
- return CSAssignment.NULL_ASSIGNMENT;
- }
- // If there was no reservation and canAssignToThisQueue returned
- // true, there is no reason to check further.
- if (!this.reservationsContinueLooking
- || appReserved.equals(Resources.none())) {
- needAssignToQueueCheck = false;
- }
- }
-
- CachedUserLimit cul = userLimits.get(application.getUser());
- Resource cachedUserLimit = null;
- if (cul != null) {
- cachedUserLimit = cul.userLimit;
- }
- Resource userLimit = computeUserLimitAndSetHeadroom(application,
- clusterResource, candidates.getPartition(), schedulingMode,
- cachedUserLimit);
- if (cul == null) {
- cul = new CachedUserLimit(userLimit);
- CachedUserLimit retVal =
- userLimits.putIfAbsent(application.getUser(), cul);
- if (retVal != null) {
- // another thread updated the user limit cache before us
- cul = retVal;
- userLimit = cul.userLimit;
- }
- }
- // Check user limit
- boolean userAssignable = true;
- if (!cul.canAssign && Resources.fitsIn(appReserved, cul.reservation)) {
- userAssignable = false;
- } else {
- userAssignable = canAssignToUser(clusterResource, application.getUser(),
- userLimit, application, candidates.getPartition(),
- currentResourceLimits);
- if (!userAssignable && Resources.fitsIn(cul.reservation, appReserved)) {
- cul.canAssign = false;
- cul.reservation = appReserved;
- }
- }
- if (!userAssignable) {
- application.updateAMContainerDiagnostics(AMState.ACTIVATED,
- "User capacity has reached its maximum limit.");
- ActivitiesLogger.APP.recordRejectedAppActivityFromLeafQueue(
- activitiesManager, node, application, application.getPriority(),
- ActivityDiagnosticConstant.QUEUE_HIT_USER_MAX_CAPACITY_LIMIT);
- continue;
- }
-
- // Try to schedule
- assignment = application.assignContainers(clusterResource,
- candidates, currentResourceLimits, schedulingMode, null);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("post-assignContainers for application " + application
- .getApplicationId());
- application.showRequests();
- }
-
- // Did we schedule or reserve a container?
- Resource assigned = assignment.getResource();
-
- if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
- Resources.none())) {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(),
- ActivityState.ACCEPTED, ActivityDiagnosticConstant.EMPTY);
- return assignment;
- } else if (assignment.getSkippedType()
- == CSAssignment.SkippedType.OTHER) {
- ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
- activitiesManager, application.getApplicationId(),
- ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
- application.updateNodeInfoForAMDiagnostics(node);
- } else if (assignment.getSkippedType()
- == CSAssignment.SkippedType.QUEUE_LIMIT) {
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.REJECTED,
- () -> ActivityDiagnosticConstant.QUEUE_DO_NOT_HAVE_ENOUGH_HEADROOM
- + " from " + application.getApplicationId());
- return assignment;
- } else{
- // If we don't allocate anything, and it is not skipped by application,
- // we will return to respect FIFO of applications
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
- ActivityDiagnosticConstant.QUEUE_SKIPPED_TO_RESPECT_FIFO);
- ActivitiesLogger.APP.finishSkippedAppAllocationRecording(
- activitiesManager, application.getApplicationId(),
- ActivityState.SKIPPED, ActivityDiagnosticConstant.EMPTY);
- return CSAssignment.NULL_ASSIGNMENT;
- }
- }
- ActivitiesLogger.QUEUE.recordQueueActivity(activitiesManager, node,
- parent.getQueuePath(), getQueuePath(), ActivityState.SKIPPED,
- ActivityDiagnosticConstant.EMPTY);
-
- return CSAssignment.NULL_ASSIGNMENT;
- }
-
- @Override
- public boolean accept(Resource cluster,
- ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
- ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> allocation =
- request.getFirstAllocatedOrReservedContainer();
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer =
- allocation.getAllocatedOrReservedContainer();
-
- // Do not check limits when allocation from a reserved container
- if (allocation.getAllocateFromReservedContainer() == null) {
- readLock.lock();
- try {
- FiCaSchedulerApp app =
- schedulerContainer.getSchedulerApplicationAttempt();
- String username = app.getUser();
- String p = schedulerContainer.getNodePartition();
-
- // check user-limit
- Resource userLimit = computeUserLimitAndSetHeadroom(app, cluster, p,
- allocation.getSchedulingMode(), null);
-
- // Deduct resources that we can release
- User user = getUser(username);
- if (user == null) {
- LOG.debug("User {} has been removed!", username);
- return false;
- }
- Resource usedResource = Resources.clone(user.getUsed(p));
- Resources.subtractFrom(usedResource,
- request.getTotalReleasedResource());
-
- if (Resources.greaterThan(resourceCalculator, cluster, usedResource,
- userLimit)) {
- LOG.debug("Used resource={} exceeded user-limit={}",
- usedResource, userLimit);
- return false;
- }
- } finally {
- readLock.unlock();
- }
- }
-
- return super.accept(cluster, request);
- }
-
- private void internalReleaseContainer(Resource clusterResource,
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> schedulerContainer) {
- RMContainer rmContainer = schedulerContainer.getRmContainer();
-
- LeafQueue targetLeafQueue =
- schedulerContainer.getSchedulerApplicationAttempt().getCSLeafQueue();
-
- if (targetLeafQueue == this) {
- // When trying to preempt containers from the same queue
- if (rmContainer.getState() == RMContainerState.RESERVED) {
- // For other reserved containers
- // This is a reservation exchange, complete previous reserved container
- completedContainer(clusterResource,
- schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getSchedulerNode(), rmContainer, SchedulerUtils
- .createAbnormalContainerStatus(rmContainer.getContainerId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, false);
- }
- } else{
- // When trying to preempt containers from different queue -- this
- // is for lazy preemption feature (kill preemption candidate in scheduling
- // cycle).
- targetLeafQueue.completedContainer(clusterResource,
- schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getSchedulerNode(),
- schedulerContainer.getRmContainer(), SchedulerUtils
- .createPreemptedContainerStatus(rmContainer.getContainerId(),
- SchedulerUtils.PREEMPTED_CONTAINER),
- RMContainerEventType.KILL, null, false);
- }
- }
-
- private void releaseContainers(Resource clusterResource,
- ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
- for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : request
- .getContainersToRelease()) {
- internalReleaseContainer(clusterResource, c);
- }
-
- // Handle container reservation looking, or lazy preemption case:
- if (null != request.getContainersToAllocate() && !request
- .getContainersToAllocate().isEmpty()) {
- for (ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode> context : request
- .getContainersToAllocate()) {
- if (null != context.getToRelease()) {
- for (SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode> c : context
- .getToRelease()) {
- internalReleaseContainer(clusterResource, c);
- }
- }
- }
- }
- }
-
- public void apply(Resource cluster,
- ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode> request) {
- // Do we need to call parent queue's apply?
- boolean applyToParentQueue = false;
-
- releaseContainers(cluster, request);
-
- writeLock.lock();
- try {
- if (request.anythingAllocatedOrReserved()) {
- ContainerAllocationProposal<FiCaSchedulerApp, FiCaSchedulerNode>
- allocation = request.getFirstAllocatedOrReservedContainer();
- SchedulerContainer<FiCaSchedulerApp, FiCaSchedulerNode>
- schedulerContainer = allocation.getAllocatedOrReservedContainer();
-
- // Do not modify queue when allocation from reserved container
- if (allocation.getAllocateFromReservedContainer() == null) {
- // Only invoke apply() of ParentQueue when new allocation /
- // reservation happen.
- applyToParentQueue = true;
- // Book-keeping
- // Note: Update headroom to account for current allocation too...
- allocateResource(cluster,
- schedulerContainer.getSchedulerApplicationAttempt(),
- allocation.getAllocatedOrReservedResource(),
- schedulerContainer.getNodePartition(),
- schedulerContainer.getRmContainer());
- orderingPolicy.containerAllocated(
- schedulerContainer.getSchedulerApplicationAttempt(),
- schedulerContainer.getRmContainer());
- }
-
- // Update reserved resource
- if (Resources.greaterThan(resourceCalculator, cluster,
- request.getTotalReservedResource(), Resources.none())) {
- incReservedResource(schedulerContainer.getNodePartition(),
- request.getTotalReservedResource());
- }
- }
- } finally {
- writeLock.unlock();
- }
-
- if (parent != null && applyToParentQueue) {
- parent.apply(cluster, request);
- }
- }
-
-
- protected Resource getHeadroom(User user, Resource queueCurrentLimit,
- Resource clusterResource, FiCaSchedulerApp application) {
- return getHeadroom(user, queueCurrentLimit, clusterResource, application,
- RMNodeLabelsManager.NO_LABEL);
- }
-
- protected Resource getHeadroom(User user, Resource queueCurrentLimit,
- Resource clusterResource, FiCaSchedulerApp application,
- String partition) {
- return getHeadroom(user, queueCurrentLimit, clusterResource,
- getResourceLimitForActiveUsers(application.getUser(), clusterResource,
- partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- partition);
- }
-
- private Resource getHeadroom(User user,
- Resource currentPartitionResourceLimit, Resource clusterResource,
- Resource userLimitResource, String partition) {
- /**
- * Headroom is:
- * min(
- * min(userLimit, queueMaxCap) - userConsumed,
- * queueMaxCap - queueUsedResources
- * )
- *
- * ( which can be expressed as,
- * min (userLimit - userConsumed, queuMaxCap - userConsumed,
- * queueMaxCap - queueUsedResources)
- * )
- *
- * given that queueUsedResources >= userConsumed, this simplifies to
- *
- * >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
- *
- * sum of queue max capacities of multiple queue's will be greater than the
- * actual capacity of a given partition, hence we need to ensure that the
- * headroom is not greater than the available resource for a given partition
- *
- * headroom = min (unused resourcelimit of a label, calculated headroom )
- */
- currentPartitionResourceLimit =
- partition.equals(RMNodeLabelsManager.NO_LABEL)
- ? currentPartitionResourceLimit
- : getQueueMaxResource(partition);
-
- Resource headroom = Resources.componentwiseMin(
- Resources.subtractNonNegative(userLimitResource,
- user.getUsed(partition)),
- Resources.subtractNonNegative(currentPartitionResourceLimit,
- usageTracker.getQueueUsage().getUsed(partition)));
- // Normalize it before return
- headroom =
- Resources.roundDown(resourceCalculator, headroom,
- queueAllocationSettings.getMinimumAllocation());
-
- //headroom = min (unused resourcelimit of a label, calculated headroom )
- Resource clusterPartitionResource =
- labelManager.getResourceByLabel(partition, clusterResource);
- Resource clusterFreePartitionResource =
- Resources.subtract(clusterPartitionResource,
- csContext.getClusterResourceUsage().getUsed(partition));
- headroom = Resources.min(resourceCalculator, clusterPartitionResource,
- clusterFreePartitionResource, headroom);
- return headroom;
- }
-
- private void setQueueResourceLimitsInfo(
- Resource clusterResource) {
- synchronized (queueResourceLimitsInfo) {
- queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom
- .getLimit());
- queueResourceLimitsInfo.setClusterResource(clusterResource);
- }
- }
-
- // It doesn't necessarily to hold application's lock here.
- @Lock({LeafQueue.class})
- Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode, Resource userLimit) {
- String user = application.getUser();
- User queueUser = getUser(user);
- if (queueUser == null) {
- LOG.debug("User {} has been removed!", user);
- return Resources.none();
- }
-
- // Compute user limit respect requested labels,
- // TODO, need consider headroom respect labels also
- if (userLimit == null) {
- userLimit = getResourceLimitForActiveUsers(application.getUser(),
- clusterResource, nodePartition, schedulingMode);
- }
- setQueueResourceLimitsInfo(clusterResource);
-
- Resource headroom =
- usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() :
- getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(),
- clusterResource, userLimit, nodePartition);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("Headroom calculation for user " + user + ": " + " userLimit="
- + userLimit + " queueMaxAvailRes="
- + cachedResourceLimitsForHeadroom.getLimit() + " consumed="
- + queueUser.getUsed() + " partition="
- + nodePartition);
- }
-
- CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
- queueUser, this, application, queueResourceLimitsInfo);
-
- application.setHeadroomProvider(headroomProvider);
-
- usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom);
-
- return userLimit;
- }
-
- @Lock(NoLock.class)
- public int getNodeLocalityDelay() {
- return nodeLocalityDelay;
- }
-
- @Lock(NoLock.class)
- public int getRackLocalityAdditionalDelay() {
- return rackLocalityAdditionalDelay;
- }
-
- @Lock(NoLock.class)
- public boolean getRackLocalityFullReset() {
- return rackLocalityFullReset;
- }
-
- /**
- *
- * @param userName
- * Name of user who has submitted one/more app to given queue.
- * @param clusterResource
- * total cluster resource
- * @param nodePartition
- * partition name
- * @param schedulingMode
- * scheduling mode
- * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
- * @return Computed User Limit
- */
- public Resource getResourceLimitForActiveUsers(String userName,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode) {
- return usersManager.getComputedResourceLimitForActiveUsers(userName,
- clusterResource, nodePartition, schedulingMode);
- }
-
- /**
- *
- * @param userName
- * Name of user who has submitted one/more app to given queue.
- * @param clusterResource
- * total cluster resource
- * @param nodePartition
- * partition name
- * @param schedulingMode
- * scheduling mode
- * RESPECT_PARTITION_EXCLUSIVITY/IGNORE_PARTITION_EXCLUSIVITY
- * @return Computed User Limit
- */
- public Resource getResourceLimitForAllUsers(String userName,
- Resource clusterResource, String nodePartition,
- SchedulingMode schedulingMode) {
- return usersManager.getComputedResourceLimitForAllUsers(userName,
- clusterResource, nodePartition, schedulingMode);
- }
-
- @Private
- protected boolean canAssignToUser(Resource clusterResource,
- String userName, Resource limit, FiCaSchedulerApp application,
- String nodePartition, ResourceLimits currentResourceLimits) {
-
- readLock.lock();
- try {
- User user = getUser(userName);
- if (user == null) {
- LOG.debug("User {} has been removed!", userName);
- return false;
- }
-
- currentResourceLimits.setAmountNeededUnreserve(Resources.none());
-
- // Note: We aren't considering the current request since there is a fixed
- // overhead of the AM, but it's a > check, not a >= check, so...
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- user.getUsed(nodePartition), limit)) {
- // if enabled, check to see if could we potentially use this node instead
- // of a reserved node if the application has reserved containers
- if (this.reservationsContinueLooking) {
- if (Resources.lessThanOrEqual(resourceCalculator, clusterResource,
- Resources.subtract(user.getUsed(),
- application.getCurrentReservation()), limit)) {
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("User " + userName + " in queue " + getQueuePath()
- + " will exceed limit based on reservations - "
- + " consumed: " + user.getUsed() + " reserved: " + application
- .getCurrentReservation() + " limit: " + limit);
- }
- Resource amountNeededToUnreserve = Resources.subtract(
- user.getUsed(nodePartition), limit);
- // we can only acquire a new container if we unreserve first to
- // respect user-limit
- currentResourceLimits.setAmountNeededUnreserve(
- amountNeededToUnreserve);
- return true;
- }
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("User " + userName + " in queue " + getQueuePath()
- + " will exceed limit - " + " consumed: " + user
- .getUsed(nodePartition) + " limit: " + limit);
- }
- return false;
- }
- return true;
- } finally {
- readLock.unlock();
- }
- }
-
- @Override
- protected void setDynamicQueueProperties(
- CapacitySchedulerConfiguration configuration) {
- // set to -1, to disable it
- configuration.setUserLimitFactor(getQueuePath(), -1);
- // Set Max AM percentage to a higher value
- configuration.setMaximumApplicationMasterResourcePerQueuePercent(
- getQueuePath(), 1f);
- super.setDynamicQueueProperties(configuration);
- }
-
- private void updateSchedulerHealthForCompletedContainer(
- RMContainer rmContainer, ContainerStatus containerStatus) {
- // Update SchedulerHealth for released / preempted container
- SchedulerHealth schedulerHealth = csContext.getSchedulerHealth();
- if (null == schedulerHealth) {
- // Only do update if we have schedulerHealth
- return;
- }
-
- if (containerStatus.getExitStatus() == ContainerExitStatus.PREEMPTED) {
- schedulerHealth.updatePreemption(Time.now(), rmContainer.getAllocatedNode(),
- rmContainer.getContainerId(), getQueuePath());
- schedulerHealth.updateSchedulerPreemptionCounts(1);
- } else {
- schedulerHealth.updateRelease(csContext.getLastNodeUpdateTime(),
- rmContainer.getAllocatedNode(), rmContainer.getContainerId(),
- getQueuePath());
- }
- }
-
- /**
- * Recalculate QueueUsage Ratio.
- *
- * @param clusterResource
- * Total Cluster Resource
- * @param nodePartition
- * Partition
- */
- public void recalculateQueueUsageRatio(Resource clusterResource,
- String nodePartition) {
- writeLock.lock();
- try {
- ResourceUsage queueResourceUsage = getQueueResourceUsage();
-
- if (nodePartition == null) {
- for (String partition : Sets.union(
- getQueueCapacities().getNodePartitionsSet(),
- queueResourceUsage.getNodePartitionsSet())) {
- usersManager.updateUsageRatio(partition, clusterResource);
- }
- } else {
- usersManager.updateUsageRatio(nodePartition, clusterResource);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void completedContainer(Resource clusterResource,
- FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
- ContainerStatus containerStatus, RMContainerEventType event, CSQueue childQueue,
- boolean sortQueues) {
- // Update SchedulerHealth for released / preempted container
- updateSchedulerHealthForCompletedContainer(rmContainer, containerStatus);
-
- if (application != null) {
- boolean removed = false;
-
- // Careful! Locking order is important!
- writeLock.lock();
- try {
- Container container = rmContainer.getContainer();
-
- // Inform the application & the node
- // Note: It's safe to assume that all state changes to RMContainer
- // happen under scheduler's lock...
- // So, this is, in effect, a transaction across application & node
- if (rmContainer.getState() == RMContainerState.RESERVED) {
- removed = application.unreserve(rmContainer.getReservedSchedulerKey(),
- node, rmContainer);
- } else{
- removed = application.containerCompleted(rmContainer, containerStatus,
- event, node.getPartition());
-
- node.releaseContainer(rmContainer.getContainerId(), false);
- }
-
- // Book-keeping
- if (removed) {
-
- // Inform the ordering policy
- orderingPolicy.containerReleased(application, rmContainer);
-
- releaseResource(clusterResource, application, container.getResource(),
- node.getPartition(), rmContainer);
- }
- } finally {
- writeLock.unlock();
- }
-
-
- if (removed) {
- // Inform the parent queue _outside_ of the leaf-queue lock
- parent.completedContainer(clusterResource, application, node,
- rmContainer, null, event, this, sortQueues);
- }
- }
-
- // Notify PreemptionManager
- csContext.getPreemptionManager().removeKillableContainer(
- new KillableContainer(
- rmContainer,
- node.getPartition(),
- getQueuePath()));
-
- // Update preemption metrics if exit status is PREEMPTED
- if (containerStatus != null
- && ContainerExitStatus.PREEMPTED == containerStatus.getExitStatus()) {
- updateQueuePreemptionMetrics(rmContainer);
- }
- }
-
- void allocateResource(Resource clusterResource,
- SchedulerApplicationAttempt application, Resource resource,
- String nodePartition, RMContainer rmContainer) {
- writeLock.lock();
- try {
- super.allocateResource(clusterResource, resource, nodePartition);
-
- // handle ignore exclusivity container
- if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
- RMNodeLabelsManager.NO_LABEL)) {
- TreeSet<RMContainer> rmContainers = null;
- if (null == (rmContainers = ignorePartitionExclusivityRMContainers.get(
- nodePartition))) {
- rmContainers = new TreeSet<>();
- ignorePartitionExclusivityRMContainers.put(nodePartition,
- rmContainers);
- }
- rmContainers.add(rmContainer);
- }
-
- // Update user metrics
- String userName = application.getUser();
-
- // Increment user's resource usage.
- User user = usersManager.updateUserResourceUsage(userName, resource,
- nodePartition, true);
-
- Resource partitionHeadroom = Resources.createResource(0, 0);
- if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
- partitionHeadroom = getHeadroom(user,
- cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
- getResourceLimitForActiveUsers(userName, clusterResource,
- nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- nodePartition);
- }
- usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
- partitionHeadroom);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(getQueuePath() + " user=" + userName + " used="
- + usageTracker.getQueueUsage().getUsed(nodePartition) + " numContainers="
- + usageTracker.getNumContainers() + " headroom = " + application.getHeadroom()
- + " user-resources=" + user.getUsed());
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- void releaseResource(Resource clusterResource,
- FiCaSchedulerApp application, Resource resource, String nodePartition,
- RMContainer rmContainer) {
- writeLock.lock();
- try {
- super.releaseResource(clusterResource, resource, nodePartition);
-
- // handle ignore exclusivity container
- if (null != rmContainer && rmContainer.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL) && !nodePartition.equals(
- RMNodeLabelsManager.NO_LABEL)) {
- if (ignorePartitionExclusivityRMContainers.containsKey(nodePartition)) {
- Set<RMContainer> rmContainers =
- ignorePartitionExclusivityRMContainers.get(nodePartition);
- rmContainers.remove(rmContainer);
- if (rmContainers.isEmpty()) {
- ignorePartitionExclusivityRMContainers.remove(nodePartition);
- }
- }
- }
-
- // Update user metrics
- String userName = application.getUser();
- User user = usersManager.updateUserResourceUsage(userName, resource,
- nodePartition, false);
-
- Resource partitionHeadroom = Resources.createResource(0, 0);
- if (usageTracker.getMetrics().getUserMetrics(userName) != null) {
- partitionHeadroom = getHeadroom(user,
- cachedResourceLimitsForHeadroom.getLimit(), clusterResource,
- getResourceLimitForActiveUsers(userName, clusterResource,
- nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- nodePartition);
- }
- usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName,
- partitionHeadroom);
-
- if (LOG.isDebugEnabled()) {
- LOG.debug(
- getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers="
- + usageTracker.getNumContainers() + " user=" + userName + " user-resources="
- + user.getUsed());
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- private void updateCurrentResourceLimits(
- ResourceLimits currentResourceLimits, Resource clusterResource) {
- // TODO: need consider non-empty node labels when resource limits supports
- // node labels
- // Even if ParentQueue will set limits respect child's max queue capacity,
- // but when allocating reserved container, CapacityScheduler doesn't do
- // this. So need cap limits by queue's max capacity here.
- this.cachedResourceLimitsForHeadroom =
- new ResourceLimits(currentResourceLimits.getLimit());
- Resource queueMaxResource = getEffectiveMaxCapacityDown(
- RMNodeLabelsManager.NO_LABEL, queueAllocationSettings.getMinimumAllocation());
- this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
- resourceCalculator, clusterResource, queueMaxResource,
- currentResourceLimits.getLimit()));
- }
-
- @Override
- public void updateClusterResource(Resource clusterResource,
- ResourceLimits currentResourceLimits) {
- writeLock.lock();
- try {
- lastClusterResource = clusterResource;
-
- updateAbsoluteCapacities();
-
- super.updateEffectiveResources(clusterResource);
-
- // Update maximum applications for the queue and for users
- updateMaximumApplications(csContext.getConfiguration());
-
- updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-
- // Update headroom info based on new cluster resource value
- // absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
- // during allocation
- setQueueResourceLimitsInfo(clusterResource);
-
- // Update user consumedRatios
- recalculateQueueUsageRatio(clusterResource, null);
-
- // Update metrics
- CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
- this, labelManager, null);
- // Update configured capacity/max-capacity for default partition only
- CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
- labelManager.getResourceByLabel(null, clusterResource),
- RMNodeLabelsManager.NO_LABEL, this);
-
- // queue metrics are updated, more resource may be available
- // activate the pending applications if possible
- activateApplications();
-
- // In case of any resource change, invalidate recalculateULCount to clear
- // the computed user-limit.
- usersManager.userLimitNeedsRecompute();
-
- // Update application properties
- for (FiCaSchedulerApp application : orderingPolicy
- .getSchedulableEntities()) {
- computeUserLimitAndSetHeadroom(application, clusterResource,
- RMNodeLabelsManager.NO_LABEL,
- SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, null);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public void incUsedResource(String nodeLabel, Resource resourceToInc,
- SchedulerApplicationAttempt application) {
- usersManager.updateUserResourceUsage(application.getUser(), resourceToInc,
- nodeLabel, true);
- super.incUsedResource(nodeLabel, resourceToInc, application);
- }
-
- @Override
- public void decUsedResource(String nodeLabel, Resource resourceToDec,
- SchedulerApplicationAttempt application) {
- usersManager.updateUserResourceUsage(application.getUser(), resourceToDec,
- nodeLabel, false);
- super.decUsedResource(nodeLabel, resourceToDec, application);
- }
-
- public void incAMUsedResource(String nodeLabel, Resource resourceToInc,
- SchedulerApplicationAttempt application) {
- User user = getUser(application.getUser());
- if (user == null) {
- return;
- }
-
- user.getResourceUsage().incAMUsed(nodeLabel,
- resourceToInc);
- // ResourceUsage has its own lock, no addition lock needs here.
- usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc);
- }
-
- public void decAMUsedResource(String nodeLabel, Resource resourceToDec,
- SchedulerApplicationAttempt application) {
- User user = getUser(application.getUser());
- if (user == null) {
- return;
- }
-
- user.getResourceUsage().decAMUsed(nodeLabel,
- resourceToDec);
- // ResourceUsage has its own lock, no addition lock needs here.
- usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec);
- }
-
- @Override
- public void recoverContainer(Resource clusterResource,
- SchedulerApplicationAttempt attempt, RMContainer rmContainer) {
- if (rmContainer.getState().equals(RMContainerState.COMPLETED)) {
- return;
- }
- if (rmContainer.getExecutionType() != ExecutionType.GUARANTEED) {
- return;
- }
- // Careful! Locking order is important!
- writeLock.lock();
- try {
- FiCaSchedulerNode node = csContext.getNode(
- rmContainer.getContainer().getNodeId());
- allocateResource(clusterResource, attempt,
- rmContainer.getContainer().getResource(), node.getPartition(),
- rmContainer);
- } finally {
- writeLock.unlock();
- }
-
- parent.recoverContainer(clusterResource, attempt, rmContainer);
- }
-
- /**
- * Obtain (read-only) collection of pending applications.
- */
- public Collection<FiCaSchedulerApp> getPendingApplications() {
- return Collections.unmodifiableCollection(pendingOrderingPolicy
- .getSchedulableEntities());
- }
-
- /**
- * Obtain (read-only) collection of active applications.
- */
- public Collection<FiCaSchedulerApp> getApplications() {
- return Collections.unmodifiableCollection(orderingPolicy
- .getSchedulableEntities());
- }
-
- /**
- * Obtain (read-only) collection of all applications.
- */
- public Collection<FiCaSchedulerApp> getAllApplications() {
- Collection<FiCaSchedulerApp> apps = new HashSet<FiCaSchedulerApp>(
- pendingOrderingPolicy.getSchedulableEntities());
- apps.addAll(orderingPolicy.getSchedulableEntities());
-
- return Collections.unmodifiableCollection(apps);
- }
-
- /**
- * Get total pending resource considering user limit for the leaf queue. This
- * will be used for calculating pending resources in the preemption monitor.
- *
- * Consider the headroom for each user in the queue.
- * Total pending for the queue =
- * sum(for each user(min((user's headroom), sum(user's pending requests))))
- * NOTE:
-
- * @param clusterResources clusterResource
- * @param partition node partition
- * @param deductReservedFromPending When a container is reserved in CS,
- * pending resource will not be deducted.
- * This could lead to double accounting when
- * doing preemption:
- * In normal cases, we should deduct reserved
- * resource from pending to avoid
- * excessive preemption.
- * @return Total pending resource considering user limit
- */
- public Resource getTotalPendingResourcesConsideringUserLimit(
- Resource clusterResources, String partition,
- boolean deductReservedFromPending) {
- readLock.lock();
- try {
- Map<String, Resource> userNameToHeadroom =
- new HashMap<>();
- Resource totalPendingConsideringUserLimit = Resource.newInstance(0, 0);
- for (FiCaSchedulerApp app : getApplications()) {
- String userName = app.getUser();
- if (!userNameToHeadroom.containsKey(userName)) {
- User user = getUsersManager().getUserAndAddIfAbsent(userName);
- Resource headroom = Resources.subtract(
- getResourceLimitForActiveUsers(app.getUser(), clusterResources,
- partition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY),
- user.getUsed(partition));
- // Make sure headroom is not negative.
- headroom = Resources.componentwiseMax(headroom, Resources.none());
- userNameToHeadroom.put(userName, headroom);
- }
-
- // Check if we need to deduct reserved from pending
- Resource pending = app.getAppAttemptResourceUsage().getPending(
- partition);
- if (deductReservedFromPending) {
- pending = Resources.subtract(pending,
- app.getAppAttemptResourceUsage().getReserved(partition));
- }
- pending = Resources.componentwiseMax(pending, Resources.none());
-
- Resource minpendingConsideringUserLimit = Resources.componentwiseMin(
- userNameToHeadroom.get(userName), pending);
- Resources.addTo(totalPendingConsideringUserLimit,
- minpendingConsideringUserLimit);
- Resources.subtractFrom(userNameToHeadroom.get(userName),
- minpendingConsideringUserLimit);
- }
- return totalPendingConsideringUserLimit;
- } finally {
- readLock.unlock();
- }
-
- }
-
- @Override
- public void collectSchedulerApplications(
- Collection<ApplicationAttemptId> apps) {
- readLock.lock();
- try {
- for (FiCaSchedulerApp pendingApp : pendingOrderingPolicy
- .getSchedulableEntities()) {
- apps.add(pendingApp.getApplicationAttemptId());
- }
- for (FiCaSchedulerApp app : orderingPolicy.getSchedulableEntities()) {
- apps.add(app.getApplicationAttemptId());
- }
- } finally {
- readLock.unlock();
- }
-
- }
-
- @Override
- public void attachContainer(Resource clusterResource,
- FiCaSchedulerApp application, RMContainer rmContainer) {
- if (application != null && rmContainer != null
- && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
- FiCaSchedulerNode node =
- csContext.getNode(rmContainer.getContainer().getNodeId());
- allocateResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer);
- LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
- + " containerState="+ rmContainer.getState()
- + " resource=" + rmContainer.getContainer().getResource()
- + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity()
- + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
- + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
- // Inform the parent queue
- parent.attachContainer(clusterResource, application, rmContainer);
- }
- }
-
- @Override
- public void detachContainer(Resource clusterResource,
- FiCaSchedulerApp application, RMContainer rmContainer) {
- if (application != null && rmContainer != null
- && rmContainer.getExecutionType() == ExecutionType.GUARANTEED) {
- FiCaSchedulerNode node =
- csContext.getNode(rmContainer.getContainer().getNodeId());
- releaseResource(clusterResource, application, rmContainer.getContainer()
- .getResource(), node.getPartition(), rmContainer);
- LOG.info("movedContainer" + " container=" + rmContainer.getContainer()
- + " containerState="+ rmContainer.getState()
- + " resource=" + rmContainer.getContainer().getResource()
- + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity()
- + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used="
- + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource);
- // Inform the parent queue
- parent.detachContainer(clusterResource, application, rmContainer);
- }
- }
-
- /**
- * @return all ignored partition exclusivity RMContainers in the LeafQueue,
- * this will be used by preemption policy.
- */
- public Map<String, TreeSet<RMContainer>>
- getIgnoreExclusivityRMContainers() {
- Map<String, TreeSet<RMContainer>> clonedMap = new HashMap<>();
-
- readLock.lock();
- try {
- for (Map.Entry<String, TreeSet<RMContainer>> entry : ignorePartitionExclusivityRMContainers
- .entrySet()) {
- clonedMap.put(entry.getKey(), new TreeSet<>(entry.getValue()));
- }
-
- return clonedMap;
-
- } finally {
- readLock.unlock();
- }
- }
-
- public void setCapacity(float capacity) {
- queueCapacities.setCapacity(capacity);
- }
-
- public void setCapacity(String nodeLabel, float capacity) {
- queueCapacities.setCapacity(nodeLabel, capacity);
- }
-
- public void setAbsoluteCapacity(float absoluteCapacity) {
- queueCapacities.setAbsoluteCapacity(absoluteCapacity);
- }
-
- public void setAbsoluteCapacity(String nodeLabel, float absoluteCapacity) {
- queueCapacities.setAbsoluteCapacity(nodeLabel, absoluteCapacity);
- }
-
- public void setMaxApplicationsPerUser(int maxApplicationsPerUser) {
- this.maxApplicationsPerUser = maxApplicationsPerUser;
- }
-
- public void setMaxApplications(int maxApplications) {
- this.maxApplications = maxApplications;
- }
-
- public void setMaxAMResourcePerQueuePercent(
- float maxAMResourcePerQueuePercent) {
- this.maxAMResourcePerQueuePercent = maxAMResourcePerQueuePercent;
- }
-
- public OrderingPolicy<FiCaSchedulerApp>
- getOrderingPolicy() {
- return orderingPolicy;
- }
-
- void setOrderingPolicy(
- OrderingPolicy<FiCaSchedulerApp> orderingPolicy) {
- writeLock.lock();
- try {
- if (null != this.orderingPolicy) {
- orderingPolicy.addAllSchedulableEntities(
- this.orderingPolicy.getSchedulableEntities());
- }
- this.orderingPolicy = orderingPolicy;
- } finally {
- writeLock.unlock();
- }
- }
-
- @Override
- public Priority getDefaultApplicationPriority() {
- return defaultAppPriorityPerQueue;
- }
-
- public void updateApplicationPriority(SchedulerApplication<FiCaSchedulerApp> app,
- Priority newAppPriority) {
- writeLock.lock();
- try {
- FiCaSchedulerApp attempt = app.getCurrentAppAttempt();
- boolean isActive = orderingPolicy.removeSchedulableEntity(attempt);
- if (!isActive) {
- pendingOrderingPolicy.removeSchedulableEntity(attempt);
- }
- // Update new priority in SchedulerApplication
- attempt.setPriority(newAppPriority);
-
- if (isActive) {
- orderingPolicy.addSchedulableEntity(attempt);
- } else {
- pendingOrderingPolicy.addSchedulableEntity(attempt);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- public OrderingPolicy<FiCaSchedulerApp>
- getPendingAppsOrderingPolicy() {
- return pendingOrderingPolicy;
- }
-
- /*
- * Holds shared values used by all applications in
- * the queue to calculate headroom on demand
- */
- static class QueueResourceLimitsInfo {
- private Resource queueCurrentLimit;
- private Resource clusterResource;
-
- public void setQueueCurrentLimit(Resource currentLimit) {
- this.queueCurrentLimit = currentLimit;
- }
-
- public Resource getQueueCurrentLimit() {
- return queueCurrentLimit;
- }
-
- public void setClusterResource(Resource clusterResource) {
- this.clusterResource = clusterResource;
- }
-
- public Resource getClusterResource() {
- return clusterResource;
- }
- }
-
- @Override
- public void stopQueue() {
- writeLock.lock();
- try {
- if (getNumApplications() > 0) {
- updateQueueState(QueueState.DRAINING);
- } else {
- updateQueueState(QueueState.STOPPED);
- }
- } finally {
- writeLock.unlock();
- }
- }
-
- void updateMaximumApplications(CapacitySchedulerConfiguration conf) {
- int maxAppsForQueue = conf.getMaximumApplicationsPerQueue(getQueuePath());
-
- int maxDefaultPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
- int maxSystemApps = conf.getMaximumSystemApplications();
- int baseMaxApplications = maxDefaultPerQueueApps > 0 ?
- Math.min(maxDefaultPerQueueApps, maxSystemApps)
- : maxSystemApps;
-
- String maxLabel = RMNodeLabelsManager.NO_LABEL;
- if (maxAppsForQueue < 0) {
- if (maxDefaultPerQueueApps > 0 && this.capacityConfigType
- != CapacityConfigType.ABSOLUTE_RESOURCE) {
- maxAppsForQueue = baseMaxApplications;
- } else {
- for (String label : queueNodeLabelsSettings.getConfiguredNodeLabels()) {
- int maxApplicationsByLabel = (int) (baseMaxApplications
- * queueCapacities.getAbsoluteCapacity(label));
- if (maxApplicationsByLabel > maxAppsForQueue) {
- maxAppsForQueue = maxApplicationsByLabel;
- maxLabel = label;
- }
- }
- }
- }
-
- setMaxApplications(maxAppsForQueue);
-
- updateMaxAppsPerUser();
-
- LOG.info("LeafQueue:" + getQueuePath() +
- "update max app related, maxApplications="
- + maxAppsForQueue + ", maxApplicationsPerUser="
- + maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
- .getAbsoluteCapacity(maxLabel) + ", Cap: " + queueCapacities
- .getCapacity(maxLabel) + ", MaxCap : " + queueCapacities
- .getMaximumCapacity(maxLabel));
- }
-
- private void updateMaxAppsPerUser() {
- int maxAppsPerUser = maxApplications;
- if (getUsersManager().getUserLimitFactor() != -1) {
- int maxApplicationsWithUserLimits = (int) (maxApplications
- * (getUsersManager().getUserLimit() / 100.0f)
- * getUsersManager().getUserLimitFactor());
- maxAppsPerUser = Math.min(maxApplications,
- maxApplicationsWithUserLimits);
- }
-
- setMaxApplicationsPerUser(maxAppsPerUser);
- }
-
- /**
- * Get all valid users in this queue.
- * @return user list
- */
- public Set<String> getAllUsers() {
- return this.getUsersManager().getUsers().keySet();
- }
-
- static class CachedUserLimit {
- final Resource userLimit;
- volatile boolean canAssign = true;
- volatile Resource reservation = Resources.none();
-
- CachedUserLimit(Resource userLimit) {
- this.userLimit = userLimit;
- }
- }
-
- private void updateQueuePreemptionMetrics(RMContainer rmc) {
- final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime();
- final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND;
- CSQueueMetrics metrics = usageTracker.getMetrics();
- Resource containerResource = rmc.getAllocatedResource();
- metrics.preemptContainer();
- long mbSeconds = (containerResource.getMemorySize() * usedMillis)
- / DateUtils.MILLIS_PER_SECOND;
- long vcSeconds = (containerResource.getVirtualCores() * usedMillis)
- / DateUtils.MILLIS_PER_SECOND;
- metrics.updatePreemptedMemoryMBSeconds(mbSeconds);
- metrics.updatePreemptedVcoreSeconds(vcSeconds);
- metrics.updatePreemptedResources(containerResource);
- metrics.updatePreemptedSecondsForCustomResources(containerResource,
- usedSeconds);
- metrics.updatePreemptedForCustomResources(containerResource);
- }
-
- @Override
- int getNumRunnableApps() {
- readLock.lock();
- try {
- return runnableApps.size();
- } finally {
- readLock.unlock();
- }
- }
-
- int getNumNonRunnableApps() {
- readLock.lock();
- try {
- return nonRunnableApps.size();
- } finally {
- readLock.unlock();
- }
- }
-
- boolean removeNonRunnableApp(FiCaSchedulerApp app) {
- writeLock.lock();
- try {
- return nonRunnableApps.remove(app);
- } finally {
- writeLock.unlock();
- }
- }
-
- List<FiCaSchedulerApp> getCopyOfNonRunnableAppSchedulables() {
- List<FiCaSchedulerApp> appsToReturn = new ArrayList<>();
- readLock.lock();
- try {
- appsToReturn.addAll(nonRunnableApps);
- } finally {
- readLock.unlock();
- }
- return appsToReturn;
- }
-
- @Override
- public boolean isEligibleForAutoDeletion() {
- return isDynamicQueue() && getNumApplications() == 0
- && csContext.getConfiguration().
- isAutoExpiredDeletionEnabled(this.getQueuePath());
+ LOG.debug("LeafQueue: name={}, fullname={}", queueName, getQueuePath());
}
}
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
index 6e7325c..ddfb24b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java
@@ -334,7 +334,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
- apps.addAll(((LeafQueue) childQueue).getApplications());
+ apps.addAll(((AbstractLeafQueue) childQueue).getApplications());
}
return Collections.unmodifiableList(apps);
} finally {
@@ -347,7 +347,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
- apps.addAll(((LeafQueue) childQueue).getPendingApplications());
+ apps.addAll(((AbstractLeafQueue) childQueue).getPendingApplications());
}
return Collections.unmodifiableList(apps);
} finally {
@@ -360,7 +360,7 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
try {
List<FiCaSchedulerApp> apps = new ArrayList<>();
for (CSQueue childQueue : getChildQueues()) {
- apps.addAll(((LeafQueue) childQueue).getAllApplications());
+ apps.addAll(((AbstractLeafQueue) childQueue).getAllApplications());
}
return Collections.unmodifiableList(apps);
} finally {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index aec2bd8..4339189 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -675,10 +675,10 @@ public class ParentQueue extends AbstractCSQueue {
// parent Queue has been converted to child queue. The CS has already
// checked to ensure that this child-queue is in STOPPED state if
// Child queue has been converted to ParentQueue.
- if ((childQueue instanceof LeafQueue
+ if ((childQueue instanceof AbstractLeafQueue
&& newChildQueue instanceof ParentQueue)
|| (childQueue instanceof ParentQueue
- && newChildQueue instanceof LeafQueue)) {
+ && newChildQueue instanceof AbstractLeafQueue)) {
// We would convert this LeafQueue to ParentQueue, or vice versa.
// consider this as the combination of DELETE then ADD.
newChildQueue.setParent(this);
@@ -1134,7 +1134,7 @@ public class ParentQueue extends AbstractCSQueue {
assignment = childAssignment;
}
Resource blockedHeadroom = null;
- if (childQueue instanceof LeafQueue) {
+ if (childQueue instanceof AbstractLeafQueue) {
blockedHeadroom = childLimits.getHeadroom();
} else {
blockedHeadroom = childLimits.getBlockedHeadroom();
@@ -1548,7 +1548,7 @@ public class ParentQueue extends AbstractCSQueue {
FiCaSchedulerNode node = csContext.getNode(
toKillContainer.getAllocatedNode());
if (null != attempt && null != node) {
- LeafQueue lq = attempt.getCSLeafQueue();
+ AbstractLeafQueue lq = attempt.getCSLeafQueue();
lq.completedContainer(clusterResource, attempt, node, toKillContainer,
SchedulerUtils.createPreemptedContainerStatus(
toKillContainer.getContainerId(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
index 5a8ce9a..4208bf0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ReservationQueue.java
@@ -40,6 +40,11 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
public ReservationQueue(CapacitySchedulerContext cs, String queueName,
PlanQueue parent) throws IOException {
super(cs, queueName, parent, null);
+ super.setupQueueConfigs(cs.getClusterResource(),
+ cs.getConfiguration());
+
+ LOG.debug("Initialized ReservationQueue: name={}, fullname={}",
+ queueName, getQueuePath());
// the following parameters are common to all reservation in the plan
updateQuotas(parent.getUserLimitForReservation(),
parent.getUserLimitFactor(),
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
index 94df9ab..73aad3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java
@@ -55,7 +55,7 @@ public class UsersManager implements AbstractUsersManager {
/*
* Member declaration for UsersManager class.
*/
- private final LeafQueue lQueue;
+ private final AbstractLeafQueue lQueue;
private final RMNodeLabelsManager labelManager;
private final ResourceCalculator resourceCalculator;
private final CapacitySchedulerContext scheduler;
@@ -301,7 +301,7 @@ public class UsersManager implements AbstractUsersManager {
* @param resourceCalculator
* rc
*/
- public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
+ public UsersManager(QueueMetrics metrics, AbstractLeafQueue lQueue,
RMNodeLabelsManager labelManager, CapacitySchedulerContext scheduler,
ResourceCalculator resourceCalculator) {
ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
index 76ab7ca..7458df9 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/queuemanagement/GuaranteedOrZeroCapacityOverTimePolicy.java
@@ -20,35 +20,23 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler
- .SchedulerDynamicEditException;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .AbstractAutoCreatedLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .AutoCreatedLeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .AutoCreatedLeafQueueConfig;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .AutoCreatedQueueManagementPolicy;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractAutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueueConfig;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueManagementPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .CapacitySchedulerContext;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .LeafQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .ManagedParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .ParentQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .QueueCapacities;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
- .QueueManagementChange;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
- .FiCaSchedulerApp;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueManagementChange;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -136,7 +124,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
return false;
}
- public boolean createLeafQueueStateIfNotExists(LeafQueue leafQueue,
+ public boolean createLeafQueueStateIfNotExists(AbstractLeafQueue leafQueue,
String partition) {
return addLeafQueueStateIfNotExists(leafQueue.getQueuePath(), partition,
new LeafQueueStatePerPartition());
@@ -482,9 +470,9 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
Set<String> newQueues = new HashSet<>();
for (CSQueue newQueue : managedParentQueue.getChildQueues()) {
- if (newQueue instanceof LeafQueue) {
+ if (newQueue instanceof AbstractLeafQueue) {
for (String nodeLabel : leafQueueTemplateNodeLabels) {
- leafQueueState.createLeafQueueStateIfNotExists((LeafQueue) newQueue,
+ leafQueueState.createLeafQueueStateIfNotExists((AbstractLeafQueue) newQueue,
nodeLabel);
newPartitions.add(nodeLabel);
}
@@ -590,7 +578,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
if (leafQueue != null) {
if (isActive(leafQueue, nodeLabel) && !hasPendingApps(leafQueue)) {
QueueCapacities capacities = leafQueueEntitlements.getCapacityOfQueue(leafQueue);
- updateToZeroCapacity(capacities, nodeLabel, (LeafQueue)childQueue);
+ updateToZeroCapacity(capacities, nodeLabel, (AbstractLeafQueue) childQueue);
deactivatedQueues.put(leafQueue.getQueuePath(), leafQueueTemplateCapacities);
}
} else {
@@ -780,7 +768,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
private void updateToZeroCapacity(QueueCapacities capacities,
- String nodeLabel, LeafQueue leafQueue) {
+ String nodeLabel, AbstractLeafQueue leafQueue) {
capacities.setCapacity(nodeLabel, 0.0f);
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
@@ -801,7 +789,7 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
@VisibleForTesting
- LeafQueueStatePerPartition getLeafQueueState(LeafQueue queue,
+ LeafQueueStatePerPartition getLeafQueueState(AbstractLeafQueue queue,
String partition) throws SchedulerDynamicEditException {
readLock.lock();
try {
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
index 011a254..3a0fd34 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java
@@ -66,11 +66,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerCha
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAMContainerLaunchDiagnosticsConstants;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSAssignment;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityHeadroomProvider;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator;
@@ -958,8 +958,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
}
}
- public LeafQueue getCSLeafQueue() {
- return (LeafQueue)queue;
+ public AbstractLeafQueue getCSLeafQueue() {
+ return (AbstractLeafQueue)queue;
}
public CSAssignment assignContainers(Resource clusterResource,
@@ -996,7 +996,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
protected void getPendingAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
- LeafQueue queue = getCSLeafQueue();
+ AbstractLeafQueue queue = getCSLeafQueue();
diagnosticMessage.append(" Details : AM Partition = ")
.append(appAMNodePartitionName.isEmpty()
? NodeLabel.DEFAULT_NODE_LABEL_PARTITION : appAMNodePartitionName)
@@ -1019,7 +1019,7 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt {
protected void getActivedAppDiagnosticMessage(
StringBuilder diagnosticMessage) {
- LeafQueue queue = getCSLeafQueue();
+ AbstractLeafQueue queue = getCSLeafQueue();
QueueCapacities queueCapacities = queue.getQueueCapacities();
QueueResourceQuotas queueResourceQuotas = queue.getQueueResourceQuotas();
diagnosticMessage.append(" Details : AM Partition = ")
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
index 74c7c20..3d410ec 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerInfo.java
@@ -25,10 +25,10 @@ import javax.xml.bind.annotation.XmlTransient;
import javax.xml.bind.annotation.XmlType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper.CapacitySchedulerInfoHelper;
@@ -179,7 +179,7 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
List<CSQueue> childLeafQueues = new ArrayList<>();
List<CSQueue> childNonLeafQueues = new ArrayList<>();
for (CSQueue queue : parent.getChildQueues()) {
- if (queue instanceof LeafQueue) {
+ if (queue instanceof AbstractLeafQueue) {
childLeafQueues.add(queue);
} else {
childNonLeafQueues.add(queue);
@@ -190,8 +190,8 @@ public class CapacitySchedulerInfo extends SchedulerInfo {
for (CSQueue queue : childQueues) {
CapacitySchedulerQueueInfo info;
- if (queue instanceof LeafQueue) {
- info = new CapacitySchedulerLeafQueueInfo(cs, (LeafQueue) queue);
+ if (queue instanceof AbstractLeafQueue) {
+ info = new CapacitySchedulerLeafQueueInfo(cs, (AbstractLeafQueue) queue);
} else {
info = new CapacitySchedulerQueueInfo(cs, queue);
info.queues = getQueues(cs, queue);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
index 4e9ced8..5b1da19 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/CapacitySchedulerLeafQueueInfo.java
@@ -27,10 +27,10 @@ import javax.xml.bind.annotation.XmlTransient;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueCapacities;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.UserInfo;
@@ -63,7 +63,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
CapacitySchedulerLeafQueueInfo() {
};
- CapacitySchedulerLeafQueueInfo(CapacityScheduler cs, LeafQueue q) {
+ CapacitySchedulerLeafQueueInfo(CapacityScheduler cs, AbstractLeafQueue q) {
super(cs, q);
numActiveApplications = q.getNumActiveApplications();
numPendingApplications = q.getNumPendingApplications();
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
index 8b3602d..0ba9bbb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/dao/helper/CapacitySchedulerInfoHelper.java
@@ -18,9 +18,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.helper;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AutoQueueTemplatePropertiesInfo;
@@ -82,7 +82,7 @@ public class CapacitySchedulerInfoHelper {
}
public static String getQueueType(CSQueue queue) {
- if (queue instanceof LeafQueue) {
+ if (queue instanceof AbstractLeafQueue) {
return LEAF_QUEUE;
} else if (queue instanceof ParentQueue) {
return PARENT_QUEUE;
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
index c5f45fd..52a34fb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestWorkPreservingRMRestart.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicat
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
@@ -445,7 +446,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
checkCSLeafQueue(rm, app, clusterResource, queueResource, usedResource,
numContainers);
- LeafQueue queue = (LeafQueue) app.getQueue();
+ AbstractLeafQueue queue = (AbstractLeafQueue) app.getQueue();
Resource availableResources =
Resources.subtract(queueResource, usedResource);
// ************ check app headroom ****************
@@ -470,7 +471,7 @@ public class TestWorkPreservingRMRestart extends ParameterizedSchedulerTestBase
SchedulerApplication<SchedulerApplicationAttempt> app,
Resource clusterResource, Resource queueResource, Resource usedResource,
int numContainers) {
- LeafQueue leafQueue = (LeafQueue) app.getQueue();
+ AbstractLeafQueue leafQueue = (AbstractLeafQueue) app.getQueue();
// assert queue used resources.
assertEquals(usedResource, leafQueue.getUsedResources());
assertEquals(numContainers, leafQueue.getNumContainers());
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
index 43347c7..b560d97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java
@@ -139,7 +139,7 @@ public class TestCSMaxRunningAppsEnforcer {
}
private void removeApp(FiCaSchedulerApp attempt) {
- LeafQueue queue = attempt.getCSLeafQueue();
+ AbstractLeafQueue queue = attempt.getCSLeafQueue();
queue.finishApplicationAttempt(attempt, queue.getQueuePath());
maxAppsEnforcer.untrackApp(attempt);
maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt);
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
index 3d40ccf..b5eaf3c 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java
@@ -685,7 +685,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
"root.a.*") + "capacity", "6w");
cs.reinitialize(csConf, mockRM.getRMContext());
- LeafQueue a2 = createQueue("root.a.a-auto.a2");
+ AbstractLeafQueue a2 = createQueue("root.a.a-auto.a2");
Assert.assertEquals("weight is not set by template", 6f,
a2.getQueueCapacities().getWeight(), 1e-6);
Assert.assertEquals("user limit factor should be disabled with dynamic queues",
@@ -719,7 +719,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
"root.a") + CapacitySchedulerConfiguration
.AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE, false);
cs.reinitialize(csConf, mockRM.getRMContext());
- LeafQueue a3 = createQueue("root.a.a3");
+ AbstractLeafQueue a3 = createQueue("root.a.a3");
Assert.assertFalse("auto queue deletion should be turned off on a3",
a3.isEligibleForAutoDeletion());
@@ -729,27 +729,26 @@ public class TestCapacitySchedulerNewQueueAutoCreation
csConf.setQueues("root", new String[]{"a", "b", "c"});
csConf.setAutoQueueCreationV2Enabled("root.c", true);
cs.reinitialize(csConf, mockRM.getRMContext());
- LeafQueue c1 = createQueue("root.c.c1");
+ AbstractLeafQueue c1 = createQueue("root.c.c1");
Assert.assertEquals("weight is not set for label TEST", 6f,
c1.getQueueCapacities().getWeight("TEST"), 1e-6);
cs.reinitialize(csConf, mockRM.getRMContext());
- c1 = (LeafQueue) cs.getQueue("root.c.c1");
+ c1 = (AbstractLeafQueue) cs.getQueue("root.c.c1");
Assert.assertEquals("weight is not set for label TEST", 6f,
c1.getQueueCapacities().getWeight("TEST"), 1e-6);
-
}
@Test
public void testAutoCreatedQueueConfigChange() throws Exception {
startScheduler();
- LeafQueue a2 = createQueue("root.a.a-auto.a2");
+ AbstractLeafQueue a2 = createQueue("root.a.a-auto.a2");
csConf.setNonLabeledQueueWeight("root.a.a-auto.a2", 4f);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("weight is not explicitly set", 4f,
a2.getQueueCapacities().getWeight(), 1e-6);
- a2 = (LeafQueue) cs.getQueue("root.a.a-auto.a2");
+ a2 = (AbstractLeafQueue) cs.getQueue("root.a.a-auto.a2");
csConf.setState("root.a.a-auto.a2", QueueState.STOPPED);
cs.reinitialize(csConf, mockRM.getRMContext());
Assert.assertEquals("root.a.a-auto.a2 has not been stopped",
@@ -1223,7 +1222,7 @@ public class TestCapacitySchedulerNewQueueAutoCreation
Assert.assertNull("root.e.e1-auto should have been removed", eAuto);
}
- protected LeafQueue createQueue(String queuePath) throws YarnException,
+ protected AbstractLeafQueue createQueue(String queuePath) throws YarnException,
IOException {
return autoQueueHandler.createQueue(new QueuePath(queuePath));
}
---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscribe@hadoop.apache.org
For additional commands, e-mail: common-commits-help@hadoop.apache.org