You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by ji...@apache.org on 2015/07/28 22:55:03 UTC
[24/50] [abbrv] hadoop git commit: YARN-3026. Move
application-specific container allocation logic from LeafQueue to
FiCaSchedulerApp. Contributed by Wangda Tan
YARN-3026. Move application-specific container allocation logic from LeafQueue to FiCaSchedulerApp. Contributed by Wangda Tan
Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/83fe34ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/83fe34ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/83fe34ac
Branch: refs/heads/YARN-1197
Commit: 83fe34ac0896cee0918bbfad7bd51231e4aec39b
Parents: fc42fa8
Author: Jian He <ji...@apache.org>
Authored: Fri Jul 24 14:00:25 2015 -0700
Committer: Jian He <ji...@apache.org>
Committed: Fri Jul 24 14:00:25 2015 -0700
----------------------------------------------------------------------
hadoop-yarn-project/CHANGES.txt | 3 +
.../server/resourcemanager/RMContextImpl.java | 3 +-
.../scheduler/ResourceLimits.java | 19 +-
.../scheduler/capacity/AbstractCSQueue.java | 27 +-
.../scheduler/capacity/CSAssignment.java | 12 +-
.../capacity/CapacityHeadroomProvider.java | 16 +-
.../scheduler/capacity/CapacityScheduler.java | 14 -
.../scheduler/capacity/LeafQueue.java | 833 +++----------------
.../scheduler/capacity/ParentQueue.java | 16 +-
.../scheduler/common/fica/FiCaSchedulerApp.java | 721 +++++++++++++++-
.../capacity/TestApplicationLimits.java | 15 +-
.../capacity/TestCapacityScheduler.java | 3 +-
.../capacity/TestContainerAllocation.java | 85 +-
.../scheduler/capacity/TestLeafQueue.java | 191 +----
.../scheduler/capacity/TestReservations.java | 111 +--
.../scheduler/capacity/TestUtils.java | 25 +-
16 files changed, 1048 insertions(+), 1046 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index d1546b2..cf00fe5 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -345,6 +345,9 @@ Release 2.8.0 - UNRELEASED
YARN-3844. Make hadoop-yarn-project Native code -Wall-clean (Alan Burlison
via Colin P. McCabe)
+ YARN-3026. Move application-specific container allocation logic from
+ LeafQueue to FiCaSchedulerApp. (Wangda Tan via jianhe)
+
OPTIMIZATIONS
YARN-3339. TestDockerContainerExecutor should pull a single image and not
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
index 2f9209c..8cadc3b 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMContextImpl.java
@@ -292,7 +292,8 @@ public class RMContextImpl implements RMContext {
activeServiceContext.setNMTokenSecretManager(nmTokenSecretManager);
}
- void setScheduler(ResourceScheduler scheduler) {
+ @VisibleForTesting
+ public void setScheduler(ResourceScheduler scheduler) {
activeServiceContext.setScheduler(scheduler);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
index 8074794..c545e9e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java
@@ -26,20 +26,25 @@ import org.apache.hadoop.yarn.util.resource.Resources;
* that, it's not "extra") resource you can get.
*/
public class ResourceLimits {
- volatile Resource limit;
+ private volatile Resource limit;
// This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES
// config. This limit indicates how much we need to unreserve to allocate
// another container.
private volatile Resource amountNeededUnreserve;
+ // How much resource you can use for next allocation, if this isn't enough for
+ // next container allocation, you may need to consider unreserve some
+ // containers.
+ private volatile Resource headroom;
+
public ResourceLimits(Resource limit) {
- this.amountNeededUnreserve = Resources.none();
- this.limit = limit;
+ this(limit, Resources.none());
}
public ResourceLimits(Resource limit, Resource amountNeededUnreserve) {
this.amountNeededUnreserve = amountNeededUnreserve;
+ this.headroom = limit;
this.limit = limit;
}
@@ -47,6 +52,14 @@ public class ResourceLimits {
return limit;
}
+ public Resource getHeadroom() {
+ return headroom;
+ }
+
+ public void setHeadroom(Resource headroom) {
+ this.headroom = headroom;
+ }
+
public Resource getAmountNeededUnreserve() {
return amountNeededUnreserve;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 7f8e164..dcc4205 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -65,7 +65,7 @@ public abstract class AbstractCSQueue implements CSQueue {
volatile int numContainers;
final Resource minimumAllocation;
- Resource maximumAllocation;
+ volatile Resource maximumAllocation;
QueueState state;
final CSQueueMetrics metrics;
protected final PrivilegedEntity queueEntity;
@@ -77,7 +77,7 @@ public abstract class AbstractCSQueue implements CSQueue {
Map<AccessType, AccessControlList> acls =
new HashMap<AccessType, AccessControlList>();
- boolean reservationsContinueLooking;
+ volatile boolean reservationsContinueLooking;
private boolean preemptionDisabled;
// Track resource usage-by-label like used-resource/pending-resource, etc.
@@ -333,7 +333,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
@Private
- public synchronized Resource getMaximumAllocation() {
+ public Resource getMaximumAllocation() {
return maximumAllocation;
}
@@ -448,13 +448,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
- String nodePartition, ResourceLimits currentResourceLimits,
- Resource nowRequired, Resource resourceCouldBeUnreserved,
+ String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved,
SchedulingMode schedulingMode) {
- // New total resource = used + required
- Resource newTotalResource =
- Resources.add(queueUsage.getUsed(nodePartition), nowRequired);
-
// Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity.
@@ -470,8 +465,14 @@ public abstract class AbstractCSQueue implements CSQueue {
getCurrentLimitResource(nodePartition, clusterResource,
currentResourceLimits, schedulingMode);
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- newTotalResource, currentLimitResource)) {
+ Resource nowTotalUsed = queueUsage.getUsed(nodePartition);
+
+ // Set headroom for currentResourceLimits
+ currentResourceLimits.setHeadroom(Resources.subtract(currentLimitResource,
+ nowTotalUsed));
+
+ if (Resources.greaterThanOrEqual(resourceCalculator, clusterResource,
+ nowTotalUsed, currentLimitResource)) {
// if reservation continous looking enabled, check to see if could we
// potentially use this node instead of a reserved node if the application
@@ -483,7 +484,7 @@ public abstract class AbstractCSQueue implements CSQueue {
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource =
- Resources.subtract(newTotalResource, resourceCouldBeUnreserved);
+ Resources.subtract(nowTotalUsed, resourceCouldBeUnreserved);
// when total-used-without-reserved-resource < currentLimit, we still
// have chance to allocate on this node by unreserving some containers
@@ -498,8 +499,6 @@ public abstract class AbstractCSQueue implements CSQueue {
+ newTotalWithoutReservedResource + ", maxLimitCapacity: "
+ currentLimitResource);
}
- currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource,
- currentLimitResource));
return true;
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 2ba2709..ceb6f7e 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -31,8 +31,8 @@ public class CSAssignment {
final private Resource resource;
private NodeType type;
- private final RMContainer excessReservation;
- private final FiCaSchedulerApp application;
+ private RMContainer excessReservation;
+ private FiCaSchedulerApp application;
private final boolean skipped;
private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation;
@@ -80,10 +80,18 @@ public class CSAssignment {
return application;
}
+ public void setApplication(FiCaSchedulerApp application) {
+ this.application = application;
+ }
+
public RMContainer getExcessReservation() {
return excessReservation;
}
+ public void setExcessReservation(RMContainer rmContainer) {
+ excessReservation = rmContainer;
+ }
+
public boolean getSkipped() {
return skipped;
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
index c6524c6..a3adf9a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityHeadroomProvider.java
@@ -25,22 +25,16 @@ public class CapacityHeadroomProvider {
LeafQueue.User user;
LeafQueue queue;
FiCaSchedulerApp application;
- Resource required;
LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo;
- public CapacityHeadroomProvider(
- LeafQueue.User user,
- LeafQueue queue,
- FiCaSchedulerApp application,
- Resource required,
- LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
-
+ public CapacityHeadroomProvider(LeafQueue.User user, LeafQueue queue,
+ FiCaSchedulerApp application,
+ LeafQueue.QueueResourceLimitsInfo queueResourceLimitsInfo) {
+
this.user = user;
this.queue = queue;
this.application = application;
- this.required = required;
this.queueResourceLimitsInfo = queueResourceLimitsInfo;
-
}
public Resource getHeadroom() {
@@ -52,7 +46,7 @@ public class CapacityHeadroomProvider {
clusterResource = queueResourceLimitsInfo.getClusterResource();
}
Resource headroom = queue.getHeadroom(user, queueCurrentLimit,
- clusterResource, application, required);
+ clusterResource, application);
// Corner case to deal with applications being slightly over-limit
if (headroom.getMemory() < 0) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index 5a20f8b..68e608a 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -1178,16 +1178,6 @@ public class CapacityScheduler extends
updateSchedulerHealth(lastNodeUpdateTime, node, tmp);
schedulerHealth.updateSchedulerFulfilledReservationCounts(1);
}
-
- RMContainer excessReservation = assignment.getExcessReservation();
- if (excessReservation != null) {
- Container container = excessReservation.getContainer();
- queue.completedContainer(clusterResource, assignment.getApplication(),
- node, excessReservation, SchedulerUtils
- .createAbnormalContainerStatus(container.getId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, true);
- }
}
// Try to schedule more if there are no reservations to fulfill
@@ -1241,10 +1231,6 @@ public class CapacityScheduler extends
RMNodeLabelsManager.NO_LABEL, clusterResource)),
SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY);
updateSchedulerHealth(lastNodeUpdateTime, node, assignment);
- if (Resources.greaterThan(calculator, clusterResource,
- assignment.getResource(), Resources.none())) {
- return;
- }
}
} else {
LOG.info("Skipping scheduling since node "
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
index 5c283f4..acfbad0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java
@@ -31,7 +31,6 @@ import java.util.Set;
import java.util.TreeSet;
import org.apache.commons.lang.StringUtils;
-import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@@ -42,30 +41,24 @@ import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
-import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
-import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.apache.hadoop.yarn.api.records.QueueInfo;
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
import org.apache.hadoop.yarn.api.records.Resource;
-import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.factories.RecordFactory;
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
@@ -93,7 +86,7 @@ public class LeafQueue extends AbstractCSQueue {
private float maxAMResourcePerQueuePercent;
- private int nodeLocalityDelay;
+ private volatile int nodeLocalityDelay;
Map<ApplicationAttemptId, FiCaSchedulerApp> applicationAttemptMap =
new HashMap<ApplicationAttemptId, FiCaSchedulerApp>();
@@ -102,7 +95,7 @@ public class LeafQueue extends AbstractCSQueue {
Set<FiCaSchedulerApp> pendingApplications;
- private float minimumAllocationFactor;
+ private volatile float minimumAllocationFactor;
private Map<String, User> users = new HashMap<String, User>();
@@ -400,11 +393,6 @@ public class LeafQueue extends AbstractCSQueue {
return Collections.singletonList(userAclInfo);
}
- @Private
- public int getNodeLocalityDelay() {
- return nodeLocalityDelay;
- }
-
public String toString() {
return queueName + ": " +
"capacity=" + queueCapacities.getCapacity() + ", " +
@@ -745,39 +733,57 @@ public class LeafQueue extends AbstractCSQueue {
return applicationAttemptMap.get(applicationAttemptId);
}
+ private void handleExcessReservedContainer(Resource clusterResource,
+ CSAssignment assignment) {
+ if (assignment.getExcessReservation() != null) {
+ RMContainer excessReservedContainer = assignment.getExcessReservation();
+
+ completedContainer(clusterResource, assignment.getApplication(),
+ scheduler.getNode(excessReservedContainer.getAllocatedNode()),
+ excessReservedContainer,
+ SchedulerUtils.createAbnormalContainerStatus(
+ excessReservedContainer.getContainerId(),
+ SchedulerUtils.UNRESERVED_CONTAINER),
+ RMContainerEventType.RELEASED, null, false);
+
+ assignment.setExcessReservation(null);
+ }
+ }
+
@Override
public synchronized CSAssignment assignContainers(Resource clusterResource,
FiCaSchedulerNode node, ResourceLimits currentResourceLimits,
SchedulingMode schedulingMode) {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
-
- if(LOG.isDebugEnabled()) {
+
+ if (LOG.isDebugEnabled()) {
LOG.debug("assignContainers: node=" + node.getNodeName()
- + " #applications=" +
- orderingPolicy.getNumSchedulableEntities());
+ + " #applications=" + orderingPolicy.getNumSchedulableEntities());
}
-
+
// Check for reserved resources
RMContainer reservedContainer = node.getReservedContainer();
if (reservedContainer != null) {
- FiCaSchedulerApp application =
+ FiCaSchedulerApp application =
getApplication(reservedContainer.getApplicationAttemptId());
synchronized (application) {
- return assignReservedContainer(application, node, reservedContainer,
+ CSAssignment assignment = application.assignReservedContainer(node, reservedContainer,
clusterResource, schedulingMode);
+ handleExcessReservedContainer(clusterResource, assignment);
+ return assignment;
}
}
-
+
// if our queue cannot access this node, just return
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY
&& !accessibleToPartition(node.getPartition())) {
return NULL_ASSIGNMENT;
}
-
+
// Check if this queue need more resource, simply skip allocation if this
// queue doesn't need more resources.
- if (!hasPendingResourceRequest(node.getPartition(),
- clusterResource, schedulingMode)) {
+ if (!hasPendingResourceRequest(node.getPartition(), clusterResource,
+ schedulingMode)) {
if (LOG.isDebugEnabled()) {
LOG.debug("Skip this queue=" + getQueuePath()
+ ", because it doesn't need more resource, schedulingMode="
@@ -785,233 +791,74 @@ public class LeafQueue extends AbstractCSQueue {
}
return NULL_ASSIGNMENT;
}
-
+
for (Iterator<FiCaSchedulerApp> assignmentIterator =
- orderingPolicy.getAssignmentIterator();
- assignmentIterator.hasNext();) {
+ orderingPolicy.getAssignmentIterator(); assignmentIterator.hasNext();) {
FiCaSchedulerApp application = assignmentIterator.next();
- if(LOG.isDebugEnabled()) {
- LOG.debug("pre-assignContainers for application "
- + application.getApplicationId());
- application.showRequests();
+
+ // Check queue max-capacity limit
+ if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
+ currentResourceLimits, application.getCurrentReservation(),
+ schedulingMode)) {
+ return NULL_ASSIGNMENT;
}
- // Check if application needs more resource, skip if it doesn't need more.
- if (!application.hasPendingResourceRequest(resourceCalculator,
- node.getPartition(), clusterResource, schedulingMode)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip app_attempt=" + application.getApplicationAttemptId()
- + ", because it doesn't need more resource, schedulingMode="
- + schedulingMode.name() + " node-label=" + node.getPartition());
- }
+ Resource userLimit =
+ computeUserLimitAndSetHeadroom(application, clusterResource,
+ node.getPartition(), schedulingMode);
+
+ // Check user limit
+ if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
+ application, node.getPartition(), currentResourceLimits)) {
continue;
}
- synchronized (application) {
- // Check if this resource is on the blacklist
- if (SchedulerAppUtils.isBlacklisted(application, node, LOG)) {
- continue;
- }
-
- // Schedule in priority order
- for (Priority priority : application.getPriorities()) {
- ResourceRequest anyRequest =
- application.getResourceRequest(priority, ResourceRequest.ANY);
- if (null == anyRequest) {
- continue;
- }
-
- // Required resource
- Resource required = anyRequest.getCapability();
+ // Try to schedule
+ CSAssignment assignment =
+ application.assignContainers(clusterResource, node,
+ currentResourceLimits, schedulingMode);
- // Do we need containers at this 'priority'?
- if (application.getTotalRequiredResources(priority) <= 0) {
- continue;
- }
-
- // AM container allocation doesn't support non-exclusive allocation to
- // avoid painful of preempt an AM container
- if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
- RMAppAttempt rmAppAttempt =
- csContext.getRMContext().getRMApps()
- .get(application.getApplicationId()).getCurrentAppAttempt();
- if (rmAppAttempt.getSubmissionContext().getUnmanagedAM() == false
- && null == rmAppAttempt.getMasterContainer()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip allocating AM container to app_attempt="
- + application.getApplicationAttemptId()
- + ", don't allow to allocate AM container in non-exclusive mode");
- }
- break;
- }
- }
-
- // Is the node-label-expression of this offswitch resource request
- // matches the node's label?
- // If not match, jump to next priority.
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(
- anyRequest, node.getPartition(), schedulingMode)) {
- continue;
- }
-
- if (!this.reservationsContinueLooking) {
- if (!shouldAllocOrReserveNewContainer(application, priority, required)) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("doesn't need containers based on reservation algo!");
- }
- continue;
- }
- }
-
- // Compute user-limit & set headroom
- // Note: We compute both user-limit & headroom with the highest
- // priority request as the target.
- // This works since we never assign lower priority requests
- // before all higher priority ones are serviced.
- Resource userLimit =
- computeUserLimitAndSetHeadroom(application, clusterResource,
- required, node.getPartition(), schedulingMode);
-
- // Check queue max-capacity limit
- if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
- currentResourceLimits, required,
- application.getCurrentReservation(), schedulingMode)) {
- return NULL_ASSIGNMENT;
- }
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("post-assignContainers for application "
+ + application.getApplicationId());
+ application.showRequests();
+ }
- // Check user limit
- if (!canAssignToUser(clusterResource, application.getUser(), userLimit,
- application, node.getPartition(), currentResourceLimits)) {
- break;
- }
+ // Did we schedule or reserve a container?
+ Resource assigned = assignment.getResource();
+
+ handleExcessReservedContainer(clusterResource, assignment);
- // Inform the application it is about to get a scheduling opportunity
- application.addSchedulingOpportunity(priority);
-
- // Increase missed-non-partitioned-resource-request-opportunity.
- // This is to make sure non-partitioned-resource-request will prefer
- // to be allocated to non-partitioned nodes
- int missedNonPartitionedRequestSchedulingOpportunity = 0;
- if (anyRequest.getNodeLabelExpression().equals(
- RMNodeLabelsManager.NO_LABEL)) {
- missedNonPartitionedRequestSchedulingOpportunity =
- application
- .addMissedNonPartitionedRequestSchedulingOpportunity(priority);
- }
-
- if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
- // Before doing allocation, we need to check scheduling opportunity to
- // make sure : non-partitioned resource request should be scheduled to
- // non-partitioned partition first.
- if (missedNonPartitionedRequestSchedulingOpportunity < scheduler
- .getNumClusterNodes()) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Skip app_attempt="
- + application.getApplicationAttemptId()
- + " priority="
- + priority
- + " because missed-non-partitioned-resource-request"
- + " opportunity under requred:"
- + " Now=" + missedNonPartitionedRequestSchedulingOpportunity
- + " required="
- + scheduler.getNumClusterNodes());
- }
-
- break;
- }
- }
-
- // Try to schedule
- CSAssignment assignment =
- assignContainersOnNode(clusterResource, node, application, priority,
- null, schedulingMode, currentResourceLimits);
-
- // Did the application skip this node?
- if (assignment.getSkipped()) {
- // Don't count 'skipped nodes' as a scheduling opportunity!
- application.subtractSchedulingOpportunity(priority);
- continue;
- }
-
- // Did we schedule or reserve a container?
- Resource assigned = assignment.getResource();
- if (Resources.greaterThan(
- resourceCalculator, clusterResource, assigned, Resources.none())) {
- // Get reserved or allocated container from application
- RMContainer reservedOrAllocatedRMContainer =
- application.getRMContainer(assignment
- .getAssignmentInformation()
- .getFirstAllocatedOrReservedContainerId());
-
- // Book-keeping
- // Note: Update headroom to account for current allocation too...
- allocateResource(clusterResource, application, assigned,
- node.getPartition(), reservedOrAllocatedRMContainer);
-
- // Don't reset scheduling opportunities for offswitch assignments
- // otherwise the app will be delayed for each non-local assignment.
- // This helps apps with many off-cluster requests schedule faster.
- if (assignment.getType() != NodeType.OFF_SWITCH) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Resetting scheduling opportunities");
- }
- application.resetSchedulingOpportunities(priority);
- }
- // Non-exclusive scheduling opportunity is different: we need reset
- // it every time to make sure non-labeled resource request will be
- // most likely allocated on non-labeled nodes first.
- application.resetMissedNonPartitionedRequestSchedulingOpportunity(priority);
-
- // Done
- return assignment;
- } else {
- // Do not assign out of order w.r.t priorities
- break;
- }
- }
- }
+ if (Resources.greaterThan(resourceCalculator, clusterResource, assigned,
+ Resources.none())) {
+ // Get reserved or allocated container from application
+ RMContainer reservedOrAllocatedRMContainer =
+ application.getRMContainer(assignment.getAssignmentInformation()
+ .getFirstAllocatedOrReservedContainerId());
- if(LOG.isDebugEnabled()) {
- LOG.debug("post-assignContainers for application "
- + application.getApplicationId());
+ // Book-keeping
+ // Note: Update headroom to account for current allocation too...
+ allocateResource(clusterResource, application, assigned,
+ node.getPartition(), reservedOrAllocatedRMContainer);
+
+ // Done
+ return assignment;
+ } else if (!assignment.getSkipped()) {
+ // If we don't allocate anything, and it is not skipped by application,
+ // we will return to respect FIFO of applications
+ return NULL_ASSIGNMENT;
}
- application.showRequests();
}
-
- return NULL_ASSIGNMENT;
+ return NULL_ASSIGNMENT;
}
- private synchronized CSAssignment assignReservedContainer(
- FiCaSchedulerApp application, FiCaSchedulerNode node,
- RMContainer rmContainer, Resource clusterResource,
- SchedulingMode schedulingMode) {
- // Do we still need this reservation?
- Priority priority = rmContainer.getReservedPriority();
- if (application.getTotalRequiredResources(priority) == 0) {
- // Release
- return new CSAssignment(application, rmContainer);
- }
-
- // Try to assign if we have sufficient resources
- CSAssignment tmp =
- assignContainersOnNode(clusterResource, node, application, priority,
- rmContainer, schedulingMode, new ResourceLimits(Resources.none()));
-
- // Doesn't matter... since it's already charged for at time of reservation
- // "re-reservation" is *free*
- CSAssignment ret = new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
- if (tmp.getAssignmentInformation().getNumAllocations() > 0) {
- ret.setFulfilledReservation(true);
- }
- return ret;
- }
-
protected Resource getHeadroom(User user, Resource queueCurrentLimit,
- Resource clusterResource, FiCaSchedulerApp application, Resource required) {
+ Resource clusterResource, FiCaSchedulerApp application) {
return getHeadroom(user, queueCurrentLimit, clusterResource,
- computeUserLimit(application, clusterResource, required, user,
- RMNodeLabelsManager.NO_LABEL, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
+ computeUserLimit(application, clusterResource, user,
+ RMNodeLabelsManager.NO_LABEL,
+ SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY));
}
private Resource getHeadroom(User user, Resource currentResourceLimit,
@@ -1055,7 +902,7 @@ public class LeafQueue extends AbstractCSQueue {
@Lock({LeafQueue.class, FiCaSchedulerApp.class})
Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application,
- Resource clusterResource, Resource required, String nodePartition,
+ Resource clusterResource, String nodePartition,
SchedulingMode schedulingMode) {
String user = application.getUser();
User queueUser = getUser(user);
@@ -1063,8 +910,8 @@ public class LeafQueue extends AbstractCSQueue {
// Compute user limit respect requested labels,
// TODO, need consider headroom respect labels also
Resource userLimit =
- computeUserLimit(application, clusterResource, required,
- queueUser, nodePartition, schedulingMode);
+ computeUserLimit(application, clusterResource, queueUser,
+ nodePartition, schedulingMode);
setQueueResourceLimitsInfo(clusterResource);
@@ -1081,7 +928,7 @@ public class LeafQueue extends AbstractCSQueue {
}
CapacityHeadroomProvider headroomProvider = new CapacityHeadroomProvider(
- queueUser, this, application, required, queueResourceLimitsInfo);
+ queueUser, this, application, queueResourceLimitsInfo);
application.setHeadroomProvider(headroomProvider);
@@ -1091,8 +938,13 @@ public class LeafQueue extends AbstractCSQueue {
}
@Lock(NoLock.class)
+ public int getNodeLocalityDelay() {
+ return nodeLocalityDelay;
+ }
+
+ @Lock(NoLock.class)
private Resource computeUserLimit(FiCaSchedulerApp application,
- Resource clusterResource, Resource required, User user,
+ Resource clusterResource, User user,
String nodePartition, SchedulingMode schedulingMode) {
// What is our current capacity?
// * It is equal to the max(required, queue-capacity) if
@@ -1106,6 +958,11 @@ public class LeafQueue extends AbstractCSQueue {
queueCapacities.getAbsoluteCapacity(nodePartition),
minimumAllocation);
+ // Assume we have required resource equals to minimumAllocation, this can
+ // make sure user limit can continuously increase till queueMaxResource
+ // reached.
+ Resource required = minimumAllocation;
+
// Allow progress for queues with miniscule capacity
queueCapacity =
Resources.max(
@@ -1206,8 +1063,8 @@ public class LeafQueue extends AbstractCSQueue {
if (Resources.lessThanOrEqual(
resourceCalculator,
clusterResource,
- Resources.subtract(user.getUsed(),application.getCurrentReservation()),
- limit)) {
+ Resources.subtract(user.getUsed(),
+ application.getCurrentReservation()), limit)) {
if (LOG.isDebugEnabled()) {
LOG.debug("User " + userName + " in queue " + getQueueName()
@@ -1215,13 +1072,11 @@ public class LeafQueue extends AbstractCSQueue {
+ user.getUsed() + " reserved: "
+ application.getCurrentReservation() + " limit: " + limit);
}
- Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit);
- // we can only acquire a new container if we unreserve first since we ignored the
- // user limit. Choose the max of user limit or what was previously set by max
- // capacity.
- currentResoureLimits.setAmountNeededUnreserve(
- Resources.max(resourceCalculator, clusterResource,
- currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve));
+ Resource amountNeededToUnreserve =
+ Resources.subtract(user.getUsed(nodePartition), limit);
+ // we can only acquire a new container if we unreserve first to
+ // respect user-limit
+ currentResoureLimits.setAmountNeededUnreserve(amountNeededToUnreserve);
return true;
}
}
@@ -1235,476 +1090,6 @@ public class LeafQueue extends AbstractCSQueue {
return true;
}
- boolean shouldAllocOrReserveNewContainer(FiCaSchedulerApp application,
- Priority priority, Resource required) {
- int requiredContainers = application.getTotalRequiredResources(priority);
- int reservedContainers = application.getNumReservedContainers(priority);
- int starvation = 0;
- if (reservedContainers > 0) {
- float nodeFactor =
- Resources.ratio(
- resourceCalculator, required, getMaximumAllocation()
- );
-
- // Use percentage of node required to bias against large containers...
- // Protect against corner case where you need the whole node with
- // Math.min(nodeFactor, minimumAllocationFactor)
- starvation =
- (int)((application.getReReservations(priority) / (float)reservedContainers) *
- (1.0f - (Math.min(nodeFactor, getMinimumAllocationFactor())))
- );
-
- if (LOG.isDebugEnabled()) {
- LOG.debug("needsContainers:" +
- " app.#re-reserve=" + application.getReReservations(priority) +
- " reserved=" + reservedContainers +
- " nodeFactor=" + nodeFactor +
- " minAllocFactor=" + getMinimumAllocationFactor() +
- " starvation=" + starvation);
- }
- }
- return (((starvation + requiredContainers) - reservedContainers) > 0);
- }
-
- private CSAssignment assignContainersOnNode(Resource clusterResource,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, SchedulingMode schedulingMode,
- ResourceLimits currentResoureLimits) {
-
- CSAssignment assigned;
-
- NodeType requestType = null;
- MutableObject allocatedContainer = new MutableObject();
- // Data-local
- ResourceRequest nodeLocalResourceRequest =
- application.getResourceRequest(priority, node.getNodeName());
- if (nodeLocalResourceRequest != null) {
- requestType = NodeType.NODE_LOCAL;
- assigned =
- assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest,
- node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- assigned.getResource(), Resources.none())) {
-
- //update locality statistics
- if (allocatedContainer.getValue() != null) {
- application.incNumAllocatedContainers(NodeType.NODE_LOCAL,
- requestType);
- }
- assigned.setType(NodeType.NODE_LOCAL);
- return assigned;
- }
- }
-
- // Rack-local
- ResourceRequest rackLocalResourceRequest =
- application.getResourceRequest(priority, node.getRackName());
- if (rackLocalResourceRequest != null) {
- if (!rackLocalResourceRequest.getRelaxLocality()) {
- return SKIP_ASSIGNMENT;
- }
-
- if (requestType != NodeType.NODE_LOCAL) {
- requestType = NodeType.RACK_LOCAL;
- }
-
- assigned =
- assignRackLocalContainers(clusterResource, rackLocalResourceRequest,
- node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- if (Resources.greaterThan(resourceCalculator, clusterResource,
- assigned.getResource(), Resources.none())) {
-
- //update locality statistics
- if (allocatedContainer.getValue() != null) {
- application.incNumAllocatedContainers(NodeType.RACK_LOCAL,
- requestType);
- }
- assigned.setType(NodeType.RACK_LOCAL);
- return assigned;
- }
- }
-
- // Off-switch
- ResourceRequest offSwitchResourceRequest =
- application.getResourceRequest(priority, ResourceRequest.ANY);
- if (offSwitchResourceRequest != null) {
- if (!offSwitchResourceRequest.getRelaxLocality()) {
- return SKIP_ASSIGNMENT;
- }
- if (requestType != NodeType.NODE_LOCAL
- && requestType != NodeType.RACK_LOCAL) {
- requestType = NodeType.OFF_SWITCH;
- }
-
- assigned =
- assignOffSwitchContainers(clusterResource, offSwitchResourceRequest,
- node, application, priority, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
-
- // update locality statistics
- if (allocatedContainer.getValue() != null) {
- application.incNumAllocatedContainers(NodeType.OFF_SWITCH, requestType);
- }
- assigned.setType(NodeType.OFF_SWITCH);
- return assigned;
- }
-
- return SKIP_ASSIGNMENT;
- }
-
- @Private
- protected boolean findNodeToUnreserve(Resource clusterResource,
- FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority,
- Resource minimumUnreservedResource) {
- // need to unreserve some other container first
- NodeId idToUnreserve =
- application.getNodeIdToUnreserve(priority, minimumUnreservedResource,
- resourceCalculator, clusterResource);
- if (idToUnreserve == null) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("checked to see if could unreserve for app but nothing "
- + "reserved that matches for this app");
- }
- return false;
- }
- FiCaSchedulerNode nodeToUnreserve = scheduler.getNode(idToUnreserve);
- if (nodeToUnreserve == null) {
- LOG.error("node to unreserve doesn't exist, nodeid: " + idToUnreserve);
- return false;
- }
- if (LOG.isDebugEnabled()) {
- LOG.debug("unreserving for app: " + application.getApplicationId()
- + " on nodeId: " + idToUnreserve
- + " in order to replace reserved application and place it on node: "
- + node.getNodeID() + " needing: " + minimumUnreservedResource);
- }
-
- // headroom
- Resources.addTo(application.getHeadroom(), nodeToUnreserve
- .getReservedContainer().getReservedResource());
-
- // Make sure to not have completedContainers sort the queues here since
- // we are already inside an iterator loop for the queues and this would
- // cause an concurrent modification exception.
- completedContainer(clusterResource, application, nodeToUnreserve,
- nodeToUnreserve.getReservedContainer(),
- SchedulerUtils.createAbnormalContainerStatus(nodeToUnreserve
- .getReservedContainer().getContainerId(),
- SchedulerUtils.UNRESERVED_CONTAINER),
- RMContainerEventType.RELEASED, null, false);
- return true;
- }
-
- private CSAssignment assignNodeLocalContainers(Resource clusterResource,
- ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node,
- FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(application, priority, node, NodeType.NODE_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority,
- nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL);
- }
-
- private CSAssignment assignRackLocalContainers(Resource clusterResource,
- ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node,
- FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(application, priority, node, NodeType.RACK_LOCAL,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority,
- rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL);
- }
-
- private CSAssignment assignOffSwitchContainers(Resource clusterResource,
- ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node,
- FiCaSchedulerApp application, Priority priority,
- RMContainer reservedContainer, MutableObject allocatedContainer,
- SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) {
- if (canAssign(application, priority, node, NodeType.OFF_SWITCH,
- reservedContainer)) {
- return assignContainer(clusterResource, node, application, priority,
- offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer,
- allocatedContainer, schedulingMode, currentResoureLimits);
- }
-
- return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH);
- }
-
- private int getActualNodeLocalityDelay() {
- return Math.min(scheduler.getNumClusterNodes(), getNodeLocalityDelay());
- }
-
- boolean canAssign(FiCaSchedulerApp application, Priority priority,
- FiCaSchedulerNode node, NodeType type, RMContainer reservedContainer) {
-
- // Clearly we need containers for this application...
- if (type == NodeType.OFF_SWITCH) {
- if (reservedContainer != null) {
- return true;
- }
-
- // 'Delay' off-switch
- ResourceRequest offSwitchRequest =
- application.getResourceRequest(priority, ResourceRequest.ANY);
- long missedOpportunities = application.getSchedulingOpportunities(priority);
- long requiredContainers = offSwitchRequest.getNumContainers();
-
- float localityWaitFactor =
- application.getLocalityWaitFactor(priority,
- scheduler.getNumClusterNodes());
-
- return ((requiredContainers * localityWaitFactor) < missedOpportunities);
- }
-
- // Check if we need containers on this rack
- ResourceRequest rackLocalRequest =
- application.getResourceRequest(priority, node.getRackName());
- if (rackLocalRequest == null || rackLocalRequest.getNumContainers() <= 0) {
- return false;
- }
-
- // If we are here, we do need containers on this rack for RACK_LOCAL req
- if (type == NodeType.RACK_LOCAL) {
- // 'Delay' rack-local just a little bit...
- long missedOpportunities = application.getSchedulingOpportunities(priority);
- return getActualNodeLocalityDelay() < missedOpportunities;
- }
-
- // Check if we need containers on this host
- if (type == NodeType.NODE_LOCAL) {
- // Now check if we need containers on this host...
- ResourceRequest nodeLocalRequest =
- application.getResourceRequest(priority, node.getNodeName());
- if (nodeLocalRequest != null) {
- return nodeLocalRequest.getNumContainers() > 0;
- }
- }
-
- return false;
- }
-
- private Container getContainer(RMContainer rmContainer,
- FiCaSchedulerApp application, FiCaSchedulerNode node,
- Resource capability, Priority priority) {
- return (rmContainer != null) ? rmContainer.getContainer() :
- createContainer(application, node, capability, priority);
- }
-
- Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node,
- Resource capability, Priority priority) {
-
- NodeId nodeId = node.getRMNode().getNodeID();
- ContainerId containerId = BuilderUtils.newContainerId(application
- .getApplicationAttemptId(), application.getNewContainerId());
-
- // Create the container
- return BuilderUtils.newContainer(containerId, nodeId, node.getRMNode()
- .getHttpAddress(), capability, priority, null);
-
- }
-
-
- private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node,
- FiCaSchedulerApp application, Priority priority,
- ResourceRequest request, NodeType type, RMContainer rmContainer,
- MutableObject createdContainer, SchedulingMode schedulingMode,
- ResourceLimits currentResoureLimits) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("assignContainers: node=" + node.getNodeName()
- + " application=" + application.getApplicationId()
- + " priority=" + priority.getPriority()
- + " request=" + request + " type=" + type);
- }
-
- // check if the resource request can access the label
- if (!SchedulerUtils.checkResourceRequestMatchingNodePartition(request,
- node.getPartition(), schedulingMode)) {
- // this is a reserved container, but we cannot allocate it now according
- // to label not match. This can be caused by node label changed
- // We should un-reserve this container.
- if (rmContainer != null) {
- unreserve(application, priority, node, rmContainer);
- }
- return new CSAssignment(Resources.none(), type);
- }
-
- Resource capability = request.getCapability();
- Resource available = node.getAvailableResource();
- Resource totalResource = node.getTotalResource();
-
- if (!Resources.lessThanOrEqual(resourceCalculator, clusterResource,
- capability, totalResource)) {
- LOG.warn("Node : " + node.getNodeID()
- + " does not have sufficient resource for request : " + request
- + " node total capability : " + node.getTotalResource());
- return new CSAssignment(Resources.none(), type);
- }
-
- assert Resources.greaterThan(
- resourceCalculator, clusterResource, available, Resources.none());
-
- // Create the container if necessary
- Container container =
- getContainer(rmContainer, application, node, capability, priority);
-
- // something went wrong getting/creating the container
- if (container == null) {
- LOG.warn("Couldn't get container for allocation!");
- return new CSAssignment(Resources.none(), type);
- }
-
- boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer(
- application, priority, capability);
-
- // Can we allocate a container on this node?
- int availableContainers =
- resourceCalculator.computeAvailableContainers(available, capability);
-
- boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource,
- currentResoureLimits.getAmountNeededUnreserve(), Resources.none());
-
- if (availableContainers > 0) {
- // Allocate...
-
- // Did we previously reserve containers at this 'priority'?
- if (rmContainer != null) {
- unreserve(application, priority, node, rmContainer);
- } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) {
- // when reservationsContinueLooking is set, we may need to unreserve
- // some containers to meet this queue, its parents', or the users' resource limits.
- // TODO, need change here when we want to support continuous reservation
- // looking for labeled partitions.
- if (!shouldAllocOrReserveNewContainer || needToUnreserve) {
- // If we shouldn't allocate/reserve new container then we should unreserve one the same
- // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve
- // could be zero. If the limit was hit then use the amount we need to unreserve to be
- // under the limit.
- Resource amountToUnreserve = capability;
- if (needToUnreserve) {
- amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve();
- }
- boolean containerUnreserved =
- findNodeToUnreserve(clusterResource, node, application, priority,
- amountToUnreserve);
- // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved
- // container (That means we *have to* unreserve some resource to
- // continue)). If we failed to unreserve some resource, we can't continue.
- if (!containerUnreserved) {
- return new CSAssignment(Resources.none(), type);
- }
- }
- }
-
- // Inform the application
- RMContainer allocatedContainer =
- application.allocate(type, node, priority, request, container);
-
- // Does the application need this resource?
- if (allocatedContainer == null) {
- return new CSAssignment(Resources.none(), type);
- }
-
- // Inform the node
- node.allocateContainer(allocatedContainer);
-
- // Inform the ordering policy
- orderingPolicy.containerAllocated(application, allocatedContainer);
-
- LOG.info("assignedContainer" +
- " application attempt=" + application.getApplicationAttemptId() +
- " container=" + container +
- " queue=" + this +
- " clusterResource=" + clusterResource);
- createdContainer.setValue(allocatedContainer);
- CSAssignment assignment = new CSAssignment(container.getResource(), type);
- assignment.getAssignmentInformation().addAllocationDetails(
- container.getId(), getQueuePath());
- assignment.getAssignmentInformation().incrAllocations();
- Resources.addTo(assignment.getAssignmentInformation().getAllocated(),
- container.getResource());
- return assignment;
- } else {
- // if we are allowed to allocate but this node doesn't have space, reserve it or
- // if this was an already a reserved container, reserve it again
- if (shouldAllocOrReserveNewContainer || rmContainer != null) {
-
- if (reservationsContinueLooking && rmContainer == null) {
- // we could possibly ignoring queue capacity or user limits when
- // reservationsContinueLooking is set. Make sure we didn't need to unreserve
- // one.
- if (needToUnreserve) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("we needed to unreserve to be able to allocate");
- }
- return new CSAssignment(Resources.none(), type);
- }
- }
-
- // Reserve by 'charging' in advance...
- reserve(application, priority, node, rmContainer, container);
-
- LOG.info("Reserved container " +
- " application=" + application.getApplicationId() +
- " resource=" + request.getCapability() +
- " queue=" + this.toString() +
- " usedCapacity=" + getUsedCapacity() +
- " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() +
- " used=" + queueUsage.getUsed() +
- " cluster=" + clusterResource);
- CSAssignment assignment =
- new CSAssignment(request.getCapability(), type);
- assignment.getAssignmentInformation().addReservationDetails(
- container.getId(), getQueuePath());
- assignment.getAssignmentInformation().incrReservations();
- Resources.addTo(assignment.getAssignmentInformation().getReserved(),
- request.getCapability());
- return assignment;
- }
- return new CSAssignment(Resources.none(), type);
- }
- }
-
- private void reserve(FiCaSchedulerApp application, Priority priority,
- FiCaSchedulerNode node, RMContainer rmContainer, Container container) {
- // Update reserved metrics if this is the first reservation
- if (rmContainer == null) {
- getMetrics().reserveResource(
- application.getUser(), container.getResource());
- }
-
- // Inform the application
- rmContainer = application.reserve(node, priority, rmContainer, container);
-
- // Update the node
- node.reserveResource(application, priority, rmContainer);
- }
-
- private boolean unreserve(FiCaSchedulerApp application, Priority priority,
- FiCaSchedulerNode node, RMContainer rmContainer) {
- // Done with the reservation?
- if (application.unreserve(node, priority)) {
- node.unreserveResource(application);
-
- // Update reserved metrics
- getMetrics().unreserveResource(application.getUser(),
- rmContainer.getContainer().getResource());
- return true;
- }
- return false;
- }
-
@Override
public void completedContainer(Resource clusterResource,
FiCaSchedulerApp application, FiCaSchedulerNode node, RMContainer rmContainer,
@@ -1724,7 +1109,7 @@ public class LeafQueue extends AbstractCSQueue {
// happen under scheduler's lock...
// So, this is, in effect, a transaction across application & node
if (rmContainer.getState() == RMContainerState.RESERVED) {
- removed = unreserve(application, rmContainer.getReservedPriority(),
+ removed = application.unreserve(rmContainer.getReservedPriority(),
node, rmContainer);
} else {
removed =
@@ -1838,15 +1223,17 @@ public class LeafQueue extends AbstractCSQueue {
// Even if ParentQueue will set limits respect child's max queue capacity,
// but when allocating reserved container, CapacityScheduler doesn't do
// this. So need cap limits by queue's max capacity here.
- this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit());
+ this.cachedResourceLimitsForHeadroom =
+ new ResourceLimits(currentResourceLimits.getLimit());
Resource queueMaxResource =
Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager
.getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource),
queueCapacities
.getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL),
minimumAllocation);
- this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator,
- clusterResource, queueMaxResource, currentResourceLimits.getLimit()));
+ this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(
+ resourceCalculator, clusterResource, queueMaxResource,
+ currentResourceLimits.getLimit()));
}
@Override
@@ -1874,7 +1261,7 @@ public class LeafQueue extends AbstractCSQueue {
orderingPolicy.getSchedulableEntities()) {
synchronized (application) {
computeUserLimitAndSetHeadroom(application, clusterResource,
- Resources.none(), RMNodeLabelsManager.NO_LABEL,
+ RMNodeLabelsManager.NO_LABEL,
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY);
}
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/83fe34ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
index 5807dd1..e54b9e2 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java
@@ -73,6 +73,7 @@ public class ParentQueue extends AbstractCSQueue {
final PartitionedQueueComparator partitionQueueComparator;
volatile int numApplications;
private final CapacitySchedulerContext scheduler;
+ private boolean needToResortQueuesAtNextAllocation = false;
private final RecordFactory recordFactory =
RecordFactoryProvider.getRecordFactory(null);
@@ -411,7 +412,7 @@ public class ParentQueue extends AbstractCSQueue {
// This will also consider parent's limits and also continuous reservation
// looking
if (!super.canAssignToThisQueue(clusterResource, node.getPartition(),
- resourceLimits, minimumAllocation, Resources.createResource(
+ resourceLimits, Resources.createResource(
getMetrics().getReservedMB(), getMetrics()
.getReservedVirtualCores()), schedulingMode)) {
break;
@@ -527,6 +528,14 @@ public class ParentQueue extends AbstractCSQueue {
private Iterator<CSQueue> sortAndGetChildrenAllocationIterator(FiCaSchedulerNode node) {
if (node.getPartition().equals(RMNodeLabelsManager.NO_LABEL)) {
+ if (needToResortQueuesAtNextAllocation) {
+ // If we skipped resort queues last time, we need to re-sort queue
+ // before allocation
+ List<CSQueue> childrenList = new ArrayList<>(childQueues);
+ childQueues.clear();
+ childQueues.addAll(childrenList);
+ needToResortQueuesAtNextAllocation = false;
+ }
return childQueues.iterator();
}
@@ -644,6 +653,11 @@ public class ParentQueue extends AbstractCSQueue {
}
}
}
+
+ // If we skipped sort queue this time, we need to resort queues to make
+ // sure we allocate from least usage (or order defined by queue policy)
+ // queues.
+ needToResortQueuesAtNextAllocation = !sortQueues;
}
// Inform the parent