You are viewing a plain text version of this content. The canonical link for it is here.
Posted to common-commits@hadoop.apache.org by wa...@apache.org on 2015/09/18 20:13:52 UTC
[32/34] hadoop git commit: YARN-1651. CapacityScheduler side changes
to support container resize. Contributed by Wangda Tan
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
index 27d70cc..6a4efa1 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java
@@ -19,7 +19,16 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.io.IOException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.Timer;
+import java.util.TimerTask;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
@@ -37,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerState;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
@@ -51,6 +61,7 @@ import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
+import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
@@ -58,13 +69,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMoveEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.annotations.VisibleForTesting;
@@ -87,7 +100,7 @@ public abstract class AbstractYarnScheduler
protected Resource clusterResource = Resource.newInstance(0, 0);
protected Resource minimumAllocation;
- private Resource maximumAllocation;
+ protected Resource maximumAllocation;
private Resource configuredMaximumAllocation;
private int maxNodeMemory = -1;
private int maxNodeVCores = -1;
@@ -231,6 +244,55 @@ public abstract class AbstractYarnScheduler
application.containerLaunchedOnNode(containerId, node.getNodeID());
}
+
+ protected synchronized void containerIncreasedOnNode(ContainerId containerId,
+ SchedulerNode node, Container increasedContainerReportedByNM) {
+ // Get the application for the finished container
+ SchedulerApplicationAttempt application =
+ getCurrentAttemptForContainer(containerId);
+ if (application == null) {
+ LOG.info("Unknown application "
+ + containerId.getApplicationAttemptId().getApplicationId()
+ + " increased container " + containerId + " on node: " + node);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ return;
+ }
+
+ RMContainer rmContainer = getRMContainer(containerId);
+ Resource rmContainerResource = rmContainer.getAllocatedResource();
+ Resource nmContainerResource = increasedContainerReportedByNM.getResource();
+
+
+ if (Resources.equals(nmContainerResource, rmContainerResource)){
+ // NM reported expected container size, tell RMContainer. Which will stop
+ // container expire monitor
+ rmContainer.handle(new RMContainerEvent(containerId,
+ RMContainerEventType.NM_DONE_CHANGE_RESOURCE));
+ } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
+ nmContainerResource, rmContainerResource)) {
+ // when rmContainerResource >= nmContainerResource, we won't do anything,
+ // it is possible a container increased is issued by RM, but AM hasn't
+ // told NM.
+ } else if (Resources.fitsIn(getResourceCalculator(), clusterResource,
+ rmContainerResource, nmContainerResource)) {
+ // When rmContainerResource <= nmContainerResource, it could happen when a
+ // container decreased by RM before it is increased in NM.
+
+ // Tell NM to decrease the container
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeDecreaseContainerEvent(node.getNodeID(),
+ Arrays.asList(rmContainer.getContainer())));
+ } else {
+ // Something wrong happened, kill the container
+ LOG.warn("Something wrong happened, container size reported by NM"
+ + " is not expected, ContainerID=" + containerId
+ + " rm-size-resource:" + rmContainerResource + " nm-size-reosurce:"
+ + nmContainerResource);
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeCleanContainerEvent(node.getNodeID(), containerId));
+ }
+ }
public T getApplicationAttempt(ApplicationAttemptId applicationAttemptId) {
SchedulerApplication<T> app =
@@ -511,6 +573,36 @@ public abstract class AbstractYarnScheduler
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
}
}
+
+ protected void decreaseContainers(
+ List<SchedContainerChangeRequest> decreaseRequests,
+ SchedulerApplicationAttempt attempt) {
+ for (SchedContainerChangeRequest request : decreaseRequests) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Processing decrease request:" + request);
+ }
+
+ boolean hasIncreaseRequest =
+ attempt.removeIncreaseRequest(request.getNodeId(),
+ request.getPriority(), request.getContainerId());
+
+ if (hasIncreaseRequest) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("While processing decrease request, found a increase request "
+ + "for the same container "
+ + request.getContainerId()
+ + ", removed the increase request");
+ }
+ }
+
+ // handle decrease request
+ decreaseContainer(request, attempt);
+ }
+ }
+
+ protected abstract void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest,
+ SchedulerApplicationAttempt attempt);
public SchedulerNode getSchedulerNode(NodeId nodeId) {
return nodes.get(nodeId);
@@ -735,4 +827,56 @@ public abstract class AbstractYarnScheduler
LOG.info("Updated the cluste max priority to maxClusterLevelAppPriority = "
+ maxClusterLevelAppPriority);
}
+
+ /**
+ * Normalize container increase/decrease request, and return
+ * SchedulerContainerResourceChangeRequest according to given
+ * ContainerResourceChangeRequest.
+ *
+ * <pre>
+ * - Returns non-null value means validation succeeded
+ * - Throw exception when any other error happens
+ * </pre>
+ */
+ private SchedContainerChangeRequest
+ checkAndNormalizeContainerChangeRequest(
+ ContainerResourceChangeRequest request, boolean increase)
+ throws YarnException {
+ // We have done a check in ApplicationMasterService, but RMContainer status
+ // / Node resource could change since AMS won't acquire lock of scheduler.
+ RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
+ increase);
+ ContainerId containerId = request.getContainerId();
+ RMContainer rmContainer = getRMContainer(containerId);
+ SchedulerNode schedulerNode =
+ getSchedulerNode(rmContainer.getAllocatedNode());
+
+ return new SchedContainerChangeRequest(schedulerNode, rmContainer,
+ request.getCapability());
+ }
+
+ protected List<SchedContainerChangeRequest>
+ checkAndNormalizeContainerChangeRequests(
+ List<ContainerResourceChangeRequest> changeRequests,
+ boolean increase) {
+ if (null == changeRequests || changeRequests.isEmpty()) {
+ return Collections.EMPTY_LIST;
+ }
+
+ List<SchedContainerChangeRequest> schedulerChangeRequests =
+ new ArrayList<SchedContainerChangeRequest>();
+ for (ContainerResourceChangeRequest r : changeRequests) {
+ SchedContainerChangeRequest sr = null;
+ try {
+ sr = checkAndNormalizeContainerChangeRequest(r, increase);
+ } catch (YarnException e) {
+ LOG.warn("Error happens when checking increase request, Ignoring.."
+ + " exception=", e);
+ continue;
+ }
+ schedulerChangeRequests.add(sr);
+ }
+
+ return schedulerChangeRequests;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
index 3f2d8af..af6caad 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/Allocation.java
@@ -34,6 +34,9 @@ public class Allocation {
final Set<ContainerId> fungibleContainers;
final List<ResourceRequest> fungibleResources;
final List<NMToken> nmTokens;
+ final List<Container> increasedContainers;
+ final List<Container> decreasedContainers;
+
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
@@ -45,12 +48,22 @@ public class Allocation {
public Allocation(List<Container> containers, Resource resourceLimit,
Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
List<ResourceRequest> fungibleResources, List<NMToken> nmTokens) {
+ this(containers, resourceLimit,strictContainers, fungibleContainers,
+ fungibleResources, nmTokens, null, null);
+ }
+
+ public Allocation(List<Container> containers, Resource resourceLimit,
+ Set<ContainerId> strictContainers, Set<ContainerId> fungibleContainers,
+ List<ResourceRequest> fungibleResources, List<NMToken> nmTokens,
+ List<Container> increasedContainers, List<Container> decreasedContainer) {
this.containers = containers;
this.resourceLimit = resourceLimit;
this.strictContainers = strictContainers;
this.fungibleContainers = fungibleContainers;
this.fungibleResources = fungibleResources;
this.nmTokens = nmTokens;
+ this.increasedContainers = increasedContainers;
+ this.decreasedContainers = decreasedContainer;
}
public List<Container> getContainers() {
@@ -76,5 +89,12 @@ public class Allocation {
public List<NMToken> getNMTokens() {
return nmTokens;
}
-
+
+ public List<Container> getIncreasedContainers() {
+ return increasedContainers;
+ }
+
+ public List<Container> getDecreasedContainers() {
+ return decreasedContainers;
+ }
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
index e318d47..7623da0 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java
@@ -20,10 +20,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.TreeMap;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicLong;
@@ -35,6 +37,8 @@ import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
@@ -63,8 +67,11 @@ public class AppSchedulingInfo {
final Set<Priority> priorities = new TreeSet<Priority>(
new org.apache.hadoop.yarn.server.resourcemanager.resource.Priority.Comparator());
- final Map<Priority, Map<String, ResourceRequest>> requests =
- new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
+ final Map<Priority, Map<String, ResourceRequest>> resourceRequestMap =
+ new ConcurrentHashMap<Priority, Map<String, ResourceRequest>>();
+ final Map<NodeId, Map<Priority, Map<ContainerId,
+ SchedContainerChangeRequest>>> increaseRequestMap =
+ new ConcurrentHashMap<>();
private Set<String> userBlacklist = new HashSet<>();
private Set<String> amBlacklist = new HashSet<>();
@@ -114,13 +121,177 @@ public class AppSchedulingInfo {
*/
private synchronized void clearRequests() {
priorities.clear();
- requests.clear();
+ resourceRequestMap.clear();
LOG.info("Application " + applicationId + " requests cleared");
}
public long getNewContainerId() {
return this.containerIdCounter.incrementAndGet();
}
+
+ public boolean hasIncreaseRequest(NodeId nodeId) {
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return false;
+ }
+ return requestsOnNode.size() > 0;
+ }
+
+ public Map<ContainerId, SchedContainerChangeRequest>
+ getIncreaseRequests(NodeId nodeId, Priority priority) {
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return null;
+ }
+
+ return requestsOnNode.get(priority);
+ }
+
+ public synchronized boolean updateIncreaseRequests(
+ List<SchedContainerChangeRequest> increaseRequests) {
+ boolean resourceUpdated = false;
+
+ for (SchedContainerChangeRequest r : increaseRequests) {
+ NodeId nodeId = r.getRMContainer().getAllocatedNode();
+
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ requestsOnNode = new TreeMap<>();
+ increaseRequestMap.put(nodeId, requestsOnNode);
+ }
+
+ SchedContainerChangeRequest prevChangeRequest =
+ getIncreaseRequest(nodeId, r.getPriority(), r.getContainerId());
+ if (null != prevChangeRequest) {
+ if (Resources.equals(prevChangeRequest.getTargetCapacity(),
+ r.getTargetCapacity())) {
+ // New target capacity is as same as what we have, just ignore the new
+ // one
+ continue;
+ }
+
+ // remove the old one
+ removeIncreaseRequest(nodeId, prevChangeRequest.getPriority(),
+ prevChangeRequest.getContainerId());
+ }
+
+ if (Resources.equals(r.getTargetCapacity(), r.getRMContainer().getAllocatedResource())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Trying to increase/decrease container, "
+ + "target capacity = previous capacity = " + prevChangeRequest
+ + " for container=" + r.getContainerId()
+ + ". Will ignore this increase request");
+ }
+ continue;
+ }
+
+ // add the new one
+ resourceUpdated = true;
+ insertIncreaseRequest(r);
+ }
+ return resourceUpdated;
+ }
+
+ // insert increase request and add missing hierarchy if missing
+ private void insertIncreaseRequest(SchedContainerChangeRequest request) {
+ NodeId nodeId = request.getNodeId();
+ Priority priority = request.getPriority();
+ ContainerId containerId = request.getContainerId();
+
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ requestsOnNode =
+ new HashMap<Priority, Map<ContainerId, SchedContainerChangeRequest>>();
+ increaseRequestMap.put(nodeId, requestsOnNode);
+ }
+
+ Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+ requestsOnNode.get(priority);
+ if (null == requestsOnNodeWithPriority) {
+ requestsOnNodeWithPriority =
+ new TreeMap<ContainerId, SchedContainerChangeRequest>();
+ requestsOnNode.put(priority, requestsOnNodeWithPriority);
+ }
+
+ requestsOnNodeWithPriority.put(containerId, request);
+
+ // update resources
+ String partition = request.getRMContainer().getNodeLabelExpression();
+ Resource delta = request.getDeltaCapacity();
+ appResourceUsage.incPending(partition, delta);
+ queue.incPendingResource(partition, delta);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Added increase request:" + request.getContainerId()
+ + " delta=" + request.getDeltaCapacity());
+ }
+
+ // update priorities
+ priorities.add(priority);
+ }
+
+ public synchronized boolean removeIncreaseRequest(NodeId nodeId, Priority priority,
+ ContainerId containerId) {
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return false;
+ }
+
+ Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+ requestsOnNode.get(priority);
+ if (null == requestsOnNodeWithPriority) {
+ return false;
+ }
+
+ SchedContainerChangeRequest request =
+ requestsOnNodeWithPriority.remove(containerId);
+
+ // remove hierarchies if it becomes empty
+ if (requestsOnNodeWithPriority.isEmpty()) {
+ requestsOnNode.remove(priority);
+ }
+ if (requestsOnNode.isEmpty()) {
+ increaseRequestMap.remove(nodeId);
+ }
+
+ if (request == null) {
+ return false;
+ }
+
+ // update queue's pending resource if request exists
+ String partition = request.getRMContainer().getNodeLabelExpression();
+ Resource delta = request.getDeltaCapacity();
+ appResourceUsage.decPending(partition, delta);
+ queue.decPendingResource(partition, delta);
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("remove increase request:" + request);
+ }
+
+ return true;
+ }
+
+ public SchedContainerChangeRequest getIncreaseRequest(NodeId nodeId,
+ Priority priority, ContainerId containerId) {
+ Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
+ increaseRequestMap.get(nodeId);
+ if (null == requestsOnNode) {
+ return null;
+ }
+
+ Map<ContainerId, SchedContainerChangeRequest> requestsOnNodeWithPriority =
+ requestsOnNode.get(priority);
+ if (null == requestsOnNodeWithPriority) {
+ return null;
+ }
+
+ return requestsOnNodeWithPriority.get(containerId);
+ }
/**
* The ApplicationMaster is updating resource requirements for the
@@ -163,11 +334,11 @@ public class AppSchedulingInfo {
}
}
- Map<String, ResourceRequest> asks = this.requests.get(priority);
+ Map<String, ResourceRequest> asks = this.resourceRequestMap.get(priority);
if (asks == null) {
asks = new ConcurrentHashMap<String, ResourceRequest>();
- this.requests.put(priority, asks);
+ this.resourceRequestMap.put(priority, asks);
this.priorities.add(priority);
}
lastRequest = asks.get(resourceName);
@@ -260,12 +431,12 @@ public class AppSchedulingInfo {
synchronized public Map<String, ResourceRequest> getResourceRequests(
Priority priority) {
- return requests.get(priority);
+ return resourceRequestMap.get(priority);
}
public List<ResourceRequest> getAllResourceRequests() {
List<ResourceRequest> ret = new ArrayList<ResourceRequest>();
- for (Map<String, ResourceRequest> r : requests.values()) {
+ for (Map<String, ResourceRequest> r : resourceRequestMap.values()) {
ret.addAll(r.values());
}
return ret;
@@ -273,7 +444,7 @@ public class AppSchedulingInfo {
synchronized public ResourceRequest getResourceRequest(Priority priority,
String resourceName) {
- Map<String, ResourceRequest> nodeRequests = requests.get(priority);
+ Map<String, ResourceRequest> nodeRequests = resourceRequestMap.get(priority);
return (nodeRequests == null) ? null : nodeRequests.get(resourceName);
}
@@ -301,6 +472,50 @@ public class AppSchedulingInfo {
}
}
+ public synchronized void increaseContainer(
+ SchedContainerChangeRequest increaseRequest) {
+ NodeId nodeId = increaseRequest.getNodeId();
+ Priority priority = increaseRequest.getPriority();
+ ContainerId containerId = increaseRequest.getContainerId();
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("allocated increase request : applicationId=" + applicationId
+ + " container=" + containerId + " host="
+ + increaseRequest.getNodeId() + " user=" + user + " resource="
+ + increaseRequest.getDeltaCapacity());
+ }
+
+ // Set queue metrics
+ queue.getMetrics().allocateResources(user, 0,
+ increaseRequest.getDeltaCapacity(), true);
+
+ // remove the increase request from pending increase request map
+ removeIncreaseRequest(nodeId, priority, containerId);
+
+ // update usage
+ appResourceUsage.incUsed(increaseRequest.getNodePartition(),
+ increaseRequest.getDeltaCapacity());
+ }
+
+ public synchronized void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest) {
+ // Delta is negative when it's a decrease request
+ Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
+
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Decrease container : applicationId=" + applicationId
+ + " container=" + decreaseRequest.getContainerId() + " host="
+ + decreaseRequest.getNodeId() + " user=" + user + " resource="
+ + absDelta);
+ }
+
+ // Set queue metrics
+ queue.getMetrics().releaseResources(user, 0, absDelta);
+
+ // update usage
+ appResourceUsage.decUsed(decreaseRequest.getNodePartition(), absDelta);
+ }
+
/**
* Resources have been allocated to this application by the resource
* scheduler. Track them.
@@ -359,11 +574,11 @@ public class AppSchedulingInfo {
// Update future requirements
decResourceRequest(node.getNodeName(), priority, nodeLocalRequest);
- ResourceRequest rackLocalRequest = requests.get(priority).get(
+ ResourceRequest rackLocalRequest = resourceRequestMap.get(priority).get(
node.getRackName());
decResourceRequest(node.getRackName(), priority, rackLocalRequest);
- ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
@@ -377,7 +592,7 @@ public class AppSchedulingInfo {
ResourceRequest request) {
request.setNumContainers(request.getNumContainers() - 1);
if (request.getNumContainers() == 0) {
- requests.get(priority).remove(resourceName);
+ resourceRequestMap.get(priority).remove(resourceName);
}
}
@@ -394,7 +609,7 @@ public class AppSchedulingInfo {
// Update future requirements
decResourceRequest(node.getRackName(), priority, rackLocalRequest);
- ResourceRequest offRackRequest = requests.get(priority).get(
+ ResourceRequest offRackRequest = resourceRequestMap.get(priority).get(
ResourceRequest.ANY);
decrementOutstanding(offRackRequest);
@@ -449,6 +664,12 @@ public class AppSchedulingInfo {
}
}
}
+
+ // also we need to check increase request
+ if (!deactivate) {
+ deactivate = increaseRequestMap.isEmpty();
+ }
+
if (deactivate) {
activeUsersManager.deactivateApplication(user, applicationId);
}
@@ -457,7 +678,7 @@ public class AppSchedulingInfo {
synchronized public void move(Queue newQueue) {
QueueMetrics oldMetrics = queue.getMetrics();
QueueMetrics newMetrics = newQueue.getMetrics();
- for (Map<String, ResourceRequest> asks : requests.values()) {
+ for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
oldMetrics.decrPendingResources(user, request.getNumContainers(),
@@ -484,7 +705,7 @@ public class AppSchedulingInfo {
synchronized public void stop(RMAppAttemptState rmAppAttemptFinalState) {
// clear pending resources metrics for the application
QueueMetrics metrics = queue.getMetrics();
- for (Map<String, ResourceRequest> asks : requests.values()) {
+ for (Map<String, ResourceRequest> asks : resourceRequestMap.values()) {
ResourceRequest request = asks.get(ResourceRequest.ANY);
if (request != null) {
metrics.decrPendingResources(user, request.getNumContainers(),
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
index 09fd73e..d94b621 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java
@@ -373,17 +373,20 @@ public class QueueMetrics implements MetricsSource {
}
private void _decrPendingResources(int containers, Resource res) {
+ // if #container = 0, means change container resource
pendingContainers.decr(containers);
- pendingMB.decr(res.getMemory() * containers);
- pendingVCores.decr(res.getVirtualCores() * containers);
+ pendingMB.decr(res.getMemory() * Math.max(containers, 1));
+ pendingVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
}
public void allocateResources(String user, int containers, Resource res,
boolean decrPending) {
+ // if #containers = 0, means change container resource
allocatedContainers.incr(containers);
aggregateContainersAllocated.incr(containers);
- allocatedMB.incr(res.getMemory() * containers);
- allocatedVCores.incr(res.getVirtualCores() * containers);
+
+ allocatedMB.incr(res.getMemory() * Math.max(containers, 1));
+ allocatedVCores.incr(res.getVirtualCores() * Math.max(containers, 1));
if (decrPending) {
_decrPendingResources(containers, res);
}
@@ -397,10 +400,11 @@ public class QueueMetrics implements MetricsSource {
}
public void releaseResources(String user, int containers, Resource res) {
+ // if #container = 0, means change container resource.
allocatedContainers.decr(containers);
aggregateContainersReleased.incr(containers);
- allocatedMB.decr(res.getMemory() * containers);
- allocatedVCores.decr(res.getVirtualCores() * containers);
+ allocatedMB.decr(res.getMemory() * Math.max(containers, 1));
+ allocatedVCores.decr(res.getVirtualCores() * Math.max(containers, 1));
QueueMetrics userMetrics = getUserMetrics(user);
if (userMetrics != null) {
userMetrics.releaseResources(user, containers, res);
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
new file mode 100644
index 0000000..ea109fd
--- /dev/null
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
+
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.util.resource.Resources;
+
+/**
+ * This is ContainerResourceChangeRequest in scheduler side, it contains some
+ * pointers to runtime objects like RMContainer, SchedulerNode, etc. This will
+ * be easier for scheduler making decision.
+ */
+public class SchedContainerChangeRequest implements
+ Comparable<SchedContainerChangeRequest> {
+ RMContainer rmContainer;
+ Resource targetCapacity;
+ SchedulerNode schedulerNode;
+ Resource deltaCapacity;
+
+ public SchedContainerChangeRequest(SchedulerNode schedulerNode,
+ RMContainer rmContainer, Resource targetCapacity) {
+ this.rmContainer = rmContainer;
+ this.targetCapacity = targetCapacity;
+ this.schedulerNode = schedulerNode;
+ deltaCapacity = Resources.subtract(targetCapacity,
+ rmContainer.getAllocatedResource());
+ }
+
+ public NodeId getNodeId() {
+ return this.rmContainer.getAllocatedNode();
+ }
+
+ public RMContainer getRMContainer() {
+ return this.rmContainer;
+ }
+
+ public Resource getTargetCapacity() {
+ return this.targetCapacity;
+ }
+
+ /**
+ * Delta capacity = before - target, so if it is a decrease request, delta
+ * capacity will be negative
+ */
+ public Resource getDeltaCapacity() {
+ return deltaCapacity;
+ }
+
+ public Priority getPriority() {
+ return rmContainer.getContainer().getPriority();
+ }
+
+ public ContainerId getContainerId() {
+ return rmContainer.getContainerId();
+ }
+
+ public String getNodePartition() {
+ return schedulerNode.getPartition();
+ }
+
+ public SchedulerNode getSchedulerNode() {
+ return schedulerNode;
+ }
+
+ @Override
+ public int hashCode() {
+ return (getContainerId().hashCode() << 16) + targetCapacity.hashCode();
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (!(other instanceof SchedContainerChangeRequest)) {
+ return false;
+ }
+ return compareTo((SchedContainerChangeRequest)other) == 0;
+ }
+
+ @Override
+ public int compareTo(SchedContainerChangeRequest other) {
+ if (other == null) {
+ return -1;
+ }
+
+ int rc = getPriority().compareTo(other.getPriority());
+ if (0 != rc) {
+ return rc;
+ }
+
+ return getContainerId().compareTo(other.getContainerId());
+ }
+
+ @Override
+ public String toString() {
+ return "<container=" + getContainerId() + ", targetCapacity="
+ + targetCapacity + ", delta=" + deltaCapacity + ", node="
+ + getNodeId().toString() + ">";
+ }
+}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
index 519de98..96288f8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplication.java
@@ -28,7 +28,7 @@ public class SchedulerApplication<T extends SchedulerApplicationAttempt> {
private Queue queue;
private final String user;
- private T currentAttempt;
+ private volatile T currentAttempt;
private volatile Priority priority;
public SchedulerApplication(Queue queue, String user) {
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
index b361d15..f064e97 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java
@@ -19,11 +19,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Set;
import java.util.concurrent.atomic.AtomicLong;
@@ -51,16 +53,19 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.AggregateAppR
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerChangeResourceEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerReservedEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
+import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerUpdatesAcquiredEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
+import org.apache.hadoop.yarn.state.InvalidStateTransitionException;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.SchedulableEntity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
@@ -104,8 +109,10 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
private AtomicLong firstAllocationRequestSentTime = new AtomicLong(0);
private AtomicLong firstContainerAllocatedTime = new AtomicLong(0);
- protected List<RMContainer> newlyAllocatedContainers =
- new ArrayList<RMContainer>();
+ protected List<RMContainer> newlyAllocatedContainers = new ArrayList<>();
+ protected Map<ContainerId, RMContainer> newlyDecreasedContainers = new HashMap<>();
+ protected Map<ContainerId, RMContainer> newlyIncreasedContainers = new HashMap<>();
+ protected Set<NMToken> updatedNMTokens = new HashSet<>();
// This pendingRelease is used in work-preserving recovery scenario to keep
// track of the AM's outstanding release requests. RM on recovery could
@@ -219,7 +226,8 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return appSchedulingInfo.getPriorities();
}
- public synchronized ResourceRequest getResourceRequest(Priority priority, String resourceName) {
+ public synchronized ResourceRequest getResourceRequest(Priority priority,
+ String resourceName) {
return this.appSchedulingInfo.getResourceRequest(priority, resourceName);
}
@@ -324,24 +332,28 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return reservedContainers;
}
- public synchronized RMContainer reserve(SchedulerNode node, Priority priority,
- RMContainer rmContainer, Container container) {
- // Create RMContainer if necessary
- if (rmContainer == null) {
- rmContainer =
- new RMContainerImpl(container, getApplicationAttemptId(),
- node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+ public synchronized boolean reserveIncreasedContainer(SchedulerNode node,
+ Priority priority, RMContainer rmContainer, Resource reservedResource) {
+ if (commonReserve(node, priority, rmContainer, reservedResource)) {
attemptResourceUsage.incReserved(node.getPartition(),
- container.getResource());
-
- // Reset the re-reservation count
- resetReReservations(priority);
- } else {
- // Note down the re-reservation
- addReReservation(priority);
+ reservedResource);
+ // succeeded
+ return true;
+ }
+
+ return false;
+ }
+
+ private synchronized boolean commonReserve(SchedulerNode node,
+ Priority priority, RMContainer rmContainer, Resource reservedResource) {
+ try {
+ rmContainer.handle(new RMContainerReservedEvent(rmContainer
+ .getContainerId(), reservedResource, node.getNodeID(), priority));
+ } catch (InvalidStateTransitionException e) {
+ // We reach here could be caused by container already finished, return
+ // false indicate it fails
+ return false;
}
- rmContainer.handle(new RMContainerReservedEvent(container.getId(),
- container.getResource(), node.getNodeID(), priority));
Map<NodeId, RMContainer> reservedContainers =
this.reservedContainers.get(priority);
@@ -356,8 +368,30 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
+ " reserved container " + rmContainer + " on node " + node
+ ". This attempt currently has " + reservedContainers.size()
+ " reserved containers at priority " + priority
- + "; currentReservation " + container.getResource());
+ + "; currentReservation " + reservedResource);
}
+
+ return true;
+ }
+
+ public synchronized RMContainer reserve(SchedulerNode node,
+ Priority priority, RMContainer rmContainer, Container container) {
+ // Create RMContainer if necessary
+ if (rmContainer == null) {
+ rmContainer =
+ new RMContainerImpl(container, getApplicationAttemptId(),
+ node.getNodeID(), appSchedulingInfo.getUser(), rmContext);
+ attemptResourceUsage.incReserved(node.getPartition(),
+ container.getResource());
+
+ // Reset the re-reservation count
+ resetReReservations(priority);
+ } else {
+ // Note down the re-reservation
+ addReReservation(priority);
+ }
+
+ commonReserve(node, priority, rmContainer, container.getResource());
return rmContainer;
}
@@ -437,69 +471,100 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
public Resource getCurrentConsumption() {
return attemptResourceUsage.getUsed();
}
-
- public static class ContainersAndNMTokensAllocation {
- List<Container> containerList;
- List<NMToken> nmTokenList;
-
- public ContainersAndNMTokensAllocation(List<Container> containerList,
- List<NMToken> nmTokenList) {
- this.containerList = containerList;
- this.nmTokenList = nmTokenList;
+
+ private Container updateContainerAndNMToken(RMContainer rmContainer,
+ boolean newContainer, boolean increasedContainer) {
+ Container container = rmContainer.getContainer();
+ ContainerType containerType = ContainerType.TASK;
+ // The working knowledge is that masterContainer for AM is null as it
+ // itself is the master container.
+ RMAppAttempt appAttempt = rmContext.getRMApps()
+ .get(container.getId().getApplicationAttemptId().getApplicationId())
+ .getCurrentAppAttempt();
+ if (isWaitingForAMContainer(getApplicationId())) {
+ containerType = ContainerType.APPLICATION_MASTER;
}
-
- public List<Container> getContainerList() {
- return containerList;
+ try {
+ // create container token and NMToken altogether.
+ container.setContainerToken(rmContext.getContainerTokenSecretManager()
+ .createContainerToken(container.getId(), container.getNodeId(),
+ getUser(), container.getResource(), container.getPriority(),
+ rmContainer.getCreationTime(), this.logAggregationContext,
+ rmContainer.getNodeLabelExpression(), containerType));
+ NMToken nmToken =
+ rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
+ getApplicationAttemptId(), container);
+ if (nmToken != null) {
+ updatedNMTokens.add(nmToken);
+ }
+ } catch (IllegalArgumentException e) {
+ // DNS might be down, skip returning this container.
+ LOG.error("Error trying to assign container token and NM token to"
+ + " an updated container " + container.getId(), e);
+ return null;
}
-
- public List<NMToken> getNMTokenList() {
- return nmTokenList;
+
+ if (newContainer) {
+ rmContainer.handle(new RMContainerEvent(
+ rmContainer.getContainerId(), RMContainerEventType.ACQUIRED));
+ } else {
+ rmContainer.handle(new RMContainerUpdatesAcquiredEvent(
+ rmContainer.getContainerId(), increasedContainer));
}
+ return container;
}
- // Create container token and NMToken altogether, if either of them fails for
+ // Create container token and update NMToken altogether, if either of them fails for
// some reason like DNS unavailable, do not return this container and keep it
// in the newlyAllocatedContainers waiting to be refetched.
- public synchronized ContainersAndNMTokensAllocation
- pullNewlyAllocatedContainersAndNMTokens() {
+ public synchronized List<Container> pullNewlyAllocatedContainers() {
List<Container> returnContainerList =
new ArrayList<Container>(newlyAllocatedContainers.size());
- List<NMToken> nmTokens = new ArrayList<NMToken>();
for (Iterator<RMContainer> i = newlyAllocatedContainers.iterator(); i
- .hasNext();) {
+ .hasNext();) {
RMContainer rmContainer = i.next();
- Container container = rmContainer.getContainer();
- ContainerType containerType = ContainerType.TASK;
- boolean isWaitingForAMContainer = isWaitingForAMContainer(
- container.getId().getApplicationAttemptId().getApplicationId());
- if (isWaitingForAMContainer) {
- containerType = ContainerType.APPLICATION_MASTER;
+ Container updatedContainer =
+ updateContainerAndNMToken(rmContainer, true, false);
+ // Only add container to return list when it's not null. updatedContainer
+ // could be null when generate token failed, it can be caused by DNS
+ // resolving failed.
+ if (updatedContainer != null) {
+ returnContainerList.add(updatedContainer);
+ i.remove();
}
- try {
- // create container token and NMToken altogether.
- container.setContainerToken(rmContext.getContainerTokenSecretManager()
- .createContainerToken(container.getId(), container.getNodeId(),
- getUser(), container.getResource(), container.getPriority(),
- rmContainer.getCreationTime(), this.logAggregationContext,
- rmContainer.getNodeLabelExpression(), containerType));
- NMToken nmToken =
- rmContext.getNMTokenSecretManager().createAndGetNMToken(getUser(),
- getApplicationAttemptId(), container);
- if (nmToken != null) {
- nmTokens.add(nmToken);
- }
- } catch (IllegalArgumentException e) {
- // DNS might be down, skip returning this container.
- LOG.error("Error trying to assign container token and NM token to" +
- " an allocated container " + container.getId(), e);
- continue;
+ }
+ return returnContainerList;
+ }
+
+ private synchronized List<Container> pullNewlyUpdatedContainers(
+ Map<ContainerId, RMContainer> updatedContainerMap, boolean increase) {
+ List<Container> returnContainerList =
+ new ArrayList<Container>(updatedContainerMap.size());
+ for (Iterator<Entry<ContainerId, RMContainer>> i =
+ updatedContainerMap.entrySet().iterator(); i.hasNext();) {
+ RMContainer rmContainer = i.next().getValue();
+ Container updatedContainer =
+ updateContainerAndNMToken(rmContainer, false, increase);
+ if (updatedContainer != null) {
+ returnContainerList.add(updatedContainer);
+ i.remove();
}
- returnContainerList.add(container);
- i.remove();
- rmContainer.handle(new RMContainerEvent(rmContainer.getContainerId(),
- RMContainerEventType.ACQUIRED));
}
- return new ContainersAndNMTokensAllocation(returnContainerList, nmTokens);
+ return returnContainerList;
+ }
+
+ public synchronized List<Container> pullNewlyIncreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyIncreasedContainers, true);
+ }
+
+ public synchronized List<Container> pullNewlyDecreasedContainers() {
+ return pullNewlyUpdatedContainers(newlyDecreasedContainers, false);
+ }
+
+ public synchronized List<NMToken> pullUpdatedNMTokens() {
+ List<NMToken> returnList = new ArrayList<NMToken>(updatedNMTokens);
+ updatedNMTokens.clear();
+ return returnList;
}
public boolean isWaitingForAMContainer(ApplicationId applicationId) {
@@ -770,4 +835,50 @@ public class SchedulerApplicationAttempt implements SchedulableEntity {
return attemptResourceUsage;
}
-}
+ public synchronized boolean removeIncreaseRequest(NodeId nodeId,
+ Priority priority, ContainerId containerId) {
+ return appSchedulingInfo.removeIncreaseRequest(nodeId, priority,
+ containerId);
+ }
+
+ public synchronized boolean updateIncreaseRequests(
+ List<SchedContainerChangeRequest> increaseRequests) {
+ return appSchedulingInfo.updateIncreaseRequests(increaseRequests);
+ }
+
+ private synchronized void changeContainerResource(
+ SchedContainerChangeRequest changeRequest, boolean increase) {
+ if (increase) {
+ appSchedulingInfo.increaseContainer(changeRequest);
+ } else {
+ appSchedulingInfo.decreaseContainer(changeRequest);
+ }
+
+ RMContainer changedRMContainer = changeRequest.getRMContainer();
+ changedRMContainer.handle(
+ new RMContainerChangeResourceEvent(changeRequest.getContainerId(),
+ changeRequest.getTargetCapacity(), increase));
+
+ // remove pending and not pulled by AM newly-increased/decreased-containers
+ // and add the new one
+ if (increase) {
+ newlyDecreasedContainers.remove(changeRequest.getContainerId());
+ newlyIncreasedContainers.put(changeRequest.getContainerId(),
+ changedRMContainer);
+ } else {
+ newlyIncreasedContainers.remove(changeRequest.getContainerId());
+ newlyDecreasedContainers.put(changeRequest.getContainerId(),
+ changedRMContainer);
+ }
+ }
+
+ public synchronized void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest) {
+ changeContainerResource(decreaseRequest, false);
+ }
+
+ public synchronized void increaseContainer(
+ SchedContainerChangeRequest increaseRequest) {
+ changeContainerResource(increaseRequest, true);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
index f03663a..f3d3906 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java
@@ -157,6 +157,37 @@ public abstract class SchedulerNode {
+ getUsedResource() + " used and " + getAvailableResource()
+ " available after allocation");
}
+
+ private synchronized void changeContainerResource(ContainerId containerId,
+ Resource deltaResource, boolean increase) {
+ if (increase) {
+ deductAvailableResource(deltaResource);
+ } else {
+ addAvailableResource(deltaResource);
+ }
+
+ LOG.info((increase ? "Increased" : "Decreased") + " container "
+ + containerId + " of capacity " + deltaResource + " on host "
+ + rmNode.getNodeAddress() + ", which has " + numContainers
+ + " containers, " + getUsedResource() + " used and "
+ + getAvailableResource() + " available after allocation");
+ }
+
+ /**
+ * The Scheduler increased container
+ */
+ public synchronized void increaseContainer(ContainerId containerId,
+ Resource deltaResource) {
+ changeContainerResource(containerId, deltaResource, true);
+ }
+
+ /**
+ * The Scheduler decreased container
+ */
+ public synchronized void decreaseContainer(ContainerId containerId,
+ Resource deltaResource) {
+ changeContainerResource(containerId, deltaResource, false);
+ }
/**
* Get available resources on the node.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
index 8047d0b..abefee8 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerUtils.java
@@ -361,7 +361,7 @@ public class SchedulerUtils {
}
public static boolean checkResourceRequestMatchingNodePartition(
- ResourceRequest offswitchResourceRequest, String nodePartition,
+ String requestedPartition, String nodePartition,
SchedulingMode schedulingMode) {
// We will only look at node label = nodeLabelToLookAt according to
// schedulingMode and partition of node.
@@ -371,12 +371,11 @@ public class SchedulerUtils {
} else {
nodePartitionToLookAt = RMNodeLabelsManager.NO_LABEL;
}
-
- String askedNodePartition = offswitchResourceRequest.getNodeLabelExpression();
- if (null == askedNodePartition) {
- askedNodePartition = RMNodeLabelsManager.NO_LABEL;
+
+ if (null == requestedPartition) {
+ requestedPartition = RMNodeLabelsManager.NO_LABEL;
}
- return askedNodePartition.equals(nodePartitionToLookAt);
+ return requestedPartition.equals(nodePartitionToLookAt);
}
private static boolean hasPendingResourceRequest(ResourceCalculator rc,
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
index 699d476..0c2ae36 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/YarnScheduler.java
@@ -35,6 +35,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationResourceUsageReport;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.QueueACL;
@@ -133,16 +134,17 @@ public interface YarnScheduler extends EventHandler<SchedulerEvent> {
* @param release
* @param blacklistAdditions
* @param blacklistRemovals
+ * @param increaseRequests
+ * @param decreaseRequests
* @return the {@link Allocation} for the application
*/
@Public
@Stable
- Allocation
- allocate(ApplicationAttemptId appAttemptId,
- List<ResourceRequest> ask,
- List<ContainerId> release,
- List<String> blacklistAdditions,
- List<String> blacklistRemovals);
+ Allocation allocate(ApplicationAttemptId appAttemptId,
+ List<ResourceRequest> ask, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests);
/**
* Get node resource usage report.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
index 0ae4d1a..9f61b11 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java
@@ -43,10 +43,10 @@ import org.apache.hadoop.yarn.security.PrivilegedEntity;
import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
-import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
@@ -76,7 +76,7 @@ public abstract class AbstractCSQueue implements CSQueue {
private boolean preemptionDisabled;
// Track resource usage-by-label like used-resource/pending-resource, etc.
- ResourceUsage queueUsage;
+ volatile ResourceUsage queueUsage;
// Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity,
// etc.
@@ -340,22 +340,27 @@ public abstract class AbstractCSQueue implements CSQueue {
return minimumAllocation;
}
- synchronized void allocateResource(Resource clusterResource,
- Resource resource, String nodePartition) {
+ synchronized void allocateResource(Resource clusterResource,
+ Resource resource, String nodePartition, boolean changeContainerResource) {
queueUsage.incUsed(nodePartition, resource);
- ++numContainers;
+ if (!changeContainerResource) {
+ ++numContainers;
+ }
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
}
protected synchronized void releaseResource(Resource clusterResource,
- Resource resource, String nodePartition) {
+ Resource resource, String nodePartition, boolean changeContainerResource) {
queueUsage.decUsed(nodePartition, resource);
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
minimumAllocation, this, labelManager, nodePartition);
- --numContainers;
+
+ if (!changeContainerResource) {
+ --numContainers;
+ }
}
@Private
@@ -446,8 +451,8 @@ public abstract class AbstractCSQueue implements CSQueue {
}
synchronized boolean canAssignToThisQueue(Resource clusterResource,
- String nodePartition, ResourceLimits currentResourceLimits, Resource resourceCouldBeUnreserved,
- SchedulingMode schedulingMode) {
+ String nodePartition, ResourceLimits currentResourceLimits,
+ Resource resourceCouldBeUnreserved, SchedulingMode schedulingMode) {
// Get current limited resource:
// - When doing RESPECT_PARTITION_EXCLUSIVITY allocation, we will respect
// queues' max capacity.
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
index 928437f..68f6f12 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSAssignment.java
@@ -41,6 +41,7 @@ public class CSAssignment {
private final boolean skipped;
private boolean fulfilledReservation;
private final AssignmentInformation assignmentInformation;
+ private boolean increaseAllocation;
public CSAssignment(Resource resource, NodeType type) {
this(resource, type, null, null, false, false);
@@ -138,4 +139,12 @@ public class CSAssignment {
public AssignmentInformation getAssignmentInformation() {
return this.assignmentInformation;
}
+
+ public boolean isIncreasedAllocation() {
+ return increaseAllocation;
+ }
+
+ public void setIncreasedAllocation(boolean flag) {
+ increaseAllocation = flag;
+ }
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
index 9855dd4..e90deeb 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java
@@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ActiveUsersManage
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
@@ -219,6 +220,14 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
boolean sortQueues);
/**
+ * We have a reserved increased container in the queue, we need to unreserve
+ * it. Since we just want to cancel the reserved increase request instead of
+ * stop the container, we shouldn't call completedContainer for such purpose.
+ */
+ public void unreserveIncreasedContainer(Resource clusterResource,
+ FiCaSchedulerApp app, FiCaSchedulerNode node, RMContainer rmContainer);
+
+ /**
* Get the number of applications in the queue.
* @return number of applications
*/
@@ -313,4 +322,11 @@ extends org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue {
* new resource asked
*/
public void decPendingResource(String nodeLabel, Resource resourceToDec);
+
+ /**
+ * Decrease container resource in the queue
+ */
+ public void decreaseContainer(Resource clusterResource,
+ SchedContainerChangeRequest decreaseRequest,
+ FiCaSchedulerApp app);
}
http://git-wip-us.apache.org/repos/asf/hadoop/blob/fcfb1ef4/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
index ad5c76c..465e233 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java
@@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.io.IOException;
import java.io.InputStream;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.Comparator;
import java.util.EnumSet;
@@ -52,6 +53,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.ContainerExitStatus;
import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerResourceChangeRequest;
import org.apache.hadoop.yarn.api.records.ContainerStatus;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Priority;
@@ -87,6 +89,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
+import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.UpdatedContainerInfo;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Allocation;
@@ -98,6 +101,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueNotFoundException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplication;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
+import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedContainerChangeRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerHealth;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
@@ -840,10 +845,14 @@ public class CapacityScheduler extends
}
@Override
+ // Note: when AM asks to decrease container or release container, we will
+ // acquire scheduler lock
@Lock(Lock.NoLock.class)
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
- List<ResourceRequest> ask, List<ContainerId> release,
- List<String> blacklistAdditions, List<String> blacklistRemovals) {
+ List<ResourceRequest> ask, List<ContainerId> release,
+ List<String> blacklistAdditions, List<String> blacklistRemovals,
+ List<ContainerResourceChangeRequest> increaseRequests,
+ List<ContainerResourceChangeRequest> decreaseRequests) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
@@ -854,6 +863,14 @@ public class CapacityScheduler extends
SchedulerUtils.normalizeRequests(
ask, getResourceCalculator(), getClusterResource(),
getMinimumResourceCapability(), getMaximumResourceCapability());
+
+ // Pre-process increase requests
+ List<SchedContainerChangeRequest> normalizedIncreaseRequests =
+ checkAndNormalizeContainerChangeRequests(increaseRequests, true);
+
+ // Pre-process decrease requests
+ List<SchedContainerChangeRequest> normalizedDecreaseRequests =
+ checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
// Release containers
releaseContainers(release, application);
@@ -870,8 +887,8 @@ public class CapacityScheduler extends
return EMPTY_ALLOCATION;
}
+ // Process resource requests
if (!ask.isEmpty()) {
-
if(LOG.isDebugEnabled()) {
LOG.debug("allocate: pre-update " + applicationAttemptId +
" ask size =" + ask.size());
@@ -888,6 +905,12 @@ public class CapacityScheduler extends
application.showRequests();
}
}
+
+ // Process increase resource requests
+ if (application.updateIncreaseRequests(normalizedIncreaseRequests)
+ && (updateDemandForQueue == null)) {
+ updateDemandForQueue = (LeafQueue) application.getQueue();
+ }
if (application.isWaitingForAMContainer(application.getApplicationId())) {
// Allocate is for AM and update AM blacklist for this
@@ -896,6 +919,9 @@ public class CapacityScheduler extends
} else {
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
+
+ // Decrease containers
+ decreaseContainers(normalizedDecreaseRequests, application);
allocation = application.getAllocation(getResourceCalculator(),
clusterResource, getMinimumResourceCapability());
@@ -957,6 +983,13 @@ public class CapacityScheduler extends
for (ContainerStatus launchedContainer : newlyLaunchedContainers) {
containerLaunchedOnNode(launchedContainer.getContainerId(), node);
}
+
+ // Processing the newly increased containers
+ List<Container> newlyIncreasedContainers =
+ nm.pullNewlyIncreasedContainers();
+ for (Container container : newlyIncreasedContainers) {
+ containerIncreasedOnNode(container.getId(), node, container);
+ }
// Process completed containers
int releasedContainers = 0;
@@ -1442,6 +1475,50 @@ public class CapacityScheduler extends
container.getId(), queue.getQueuePath());
}
}
+
+ @Lock(CapacityScheduler.class)
+ @Override
+ protected synchronized void decreaseContainer(
+ SchedContainerChangeRequest decreaseRequest,
+ SchedulerApplicationAttempt attempt) {
+ RMContainer rmContainer = decreaseRequest.getRMContainer();
+
+ // Check container status before doing decrease
+ if (rmContainer.getState() != RMContainerState.RUNNING) {
+ LOG.info("Trying to decrease a container not in RUNNING state, container="
+ + rmContainer + " state=" + rmContainer.getState().name());
+ return;
+ }
+
+ // Delta capacity of this decrease request is 0, this decrease request may
+ // just to cancel increase request
+ if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Decrease target resource equals to existing resource for container:"
+ + decreaseRequest.getContainerId()
+ + " ignore this decrease request.");
+ }
+ return;
+ }
+
+ // Save resource before decrease
+ Resource resourceBeforeDecrease =
+ Resources.clone(rmContainer.getContainer().getResource());
+
+ FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
+ LeafQueue queue = (LeafQueue) attempt.getQueue();
+ queue.decreaseContainer(clusterResource, decreaseRequest, app);
+
+ // Notify RMNode the container will be decreased
+ this.rmContext.getDispatcher().getEventHandler()
+ .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
+ Arrays.asList(rmContainer.getContainer())));
+
+ LOG.info("Application attempt " + app.getApplicationAttemptId()
+ + " decreased container:" + decreaseRequest.getContainerId() + " from "
+ + resourceBeforeDecrease + " to "
+ + decreaseRequest.getTargetCapacity());
+ }
@Lock(Lock.NoLock.class)
@VisibleForTesting